diff --git a/client/src/main/java/org/elasticsearch/client/Connection.java b/client/src/main/java/org/elasticsearch/client/Connection.java index 3d48a4eeae1..f17ff69e14c 100644 --- a/client/src/main/java/org/elasticsearch/client/Connection.java +++ b/client/src/main/java/org/elasticsearch/client/Connection.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; * Any change to the state of a connection should be made through the connection pool. */ public class Connection { - //TODO make these values configurable through the connection pool? private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); private static final long MAX_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30); private final HttpHost host; diff --git a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java b/client/src/main/java/org/elasticsearch/client/ConnectionPool.java deleted file mode 100644 index 55e9554684e..00000000000 --- a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpHost; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Pool of connections to the different hosts that belong to an elasticsearch cluster. - * It keeps track of the different hosts to communicate with and allows to retrieve an iterator of connections to be used - * for each request. Marks connections as dead/alive when needed. - * Provides an iterator of connections to be used at each {@link #nextConnection()} call. - * The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive. - * The {@link #onFailure(Connection)} method marks the connection provided as an argument dead. - * This base implementation doesn't define the list implementation that stores connections, so that concurrency can be - * handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed). - */ -public abstract class ConnectionPool implements Closeable { - - private static final Log logger = LogFactory.getLog(ConnectionPool.class); - - private final AtomicInteger lastConnectionIndex = new AtomicInteger(0); - - /** - * Allows to retrieve the concrete list of connections. Not defined directly as a member - * of this class as subclasses may need to handle concurrency if the list can change, for - * instance defining the field as volatile. On the other hand static implementations - * can just make the list final instead. - */ - protected abstract List getConnections(); - - /** - * Returns an iterator of connections that should be used for a request call. - * Ideally, the first connection is retrieved from the iterator and used successfully for the request. - * Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried. - * The maximum total of attempts is equal to the number of connections that are available in the iterator. - * The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case. - * In case there are no alive connections available, or dead ones that should be retried, one dead connection - * gets resurrected and returned. - */ - public final Iterator nextConnection() { - List connections = getConnections(); - if (connections.isEmpty()) { - throw new IllegalStateException("no connections available in the connection pool"); - } - - List rotatedConnections = new ArrayList<>(connections); - //TODO is it possible to make this O(1)? (rotate is O(n)) - Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement()); - Iterator connectionIterator = rotatedConnections.iterator(); - while (connectionIterator.hasNext()) { - Connection connection = connectionIterator.next(); - if (connection.isAlive() == false && connection.shouldBeRetried() == false) { - connectionIterator.remove(); - } - } - if (rotatedConnections.isEmpty()) { - List sortedConnections = new ArrayList<>(connections); - Collections.sort(sortedConnections, new Comparator() { - @Override - public int compare(Connection o1, Connection o2) { - return Long.compare(o1.getDeadUntil(), o2.getDeadUntil()); - } - }); - Connection connection = sortedConnections.get(0); - connection.markResurrected(); - logger.trace("marked connection resurrected for " + connection.getHost()); - return Collections.singleton(connection).iterator(); - } - return rotatedConnections.iterator(); - } - - /** - * Helper method to be used by subclasses when needing to create a new list - * of connections given their corresponding hosts - */ - protected final List createConnections(HttpHost... hosts) { - List connections = new ArrayList<>(); - for (HttpHost host : hosts) { - Objects.requireNonNull(host, "host cannot be null"); - connections.add(new Connection(host)); - } - return Collections.unmodifiableList(connections); - } - - /** - * Called after each successful request call. - * Receives as an argument the connection that was used for the successful request. - */ - public void onSuccess(Connection connection) { - connection.markAlive(); - logger.trace("marked connection alive for " + connection.getHost()); - } - - /** - * Called after each failed attempt. - * Receives as an argument the connection that was used for the failed attempt. - */ - public void onFailure(Connection connection) throws IOException { - connection.markDead(); - logger.debug("marked connection dead for " + connection.getHost()); - } -} diff --git a/client/src/main/java/org/elasticsearch/client/RestClient.java b/client/src/main/java/org/elasticsearch/client/RestClient.java index c4d6ede623e..f578d7ce304 100644 --- a/client/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/src/main/java/org/elasticsearch/client/RestClient.java @@ -39,38 +39,51 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public final class RestClient implements Closeable { private static final Log logger = LogFactory.getLog(RestClient.class); private final CloseableHttpClient client; - private final ConnectionPool connectionPool; private final long maxRetryTimeout; + private final AtomicInteger lastConnectionIndex = new AtomicInteger(0); + private volatile List connections; + private volatile FailureListener failureListener = new FailureListener(); - private RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) { + private RestClient(CloseableHttpClient client, long maxRetryTimeout, HttpHost... hosts) { this.client = client; - this.connectionPool = connectionPool; this.maxRetryTimeout = maxRetryTimeout; + setNodes(hosts); + } + + public synchronized void setNodes(HttpHost... hosts) { + List connections = new ArrayList<>(hosts.length); + for (HttpHost host : hosts) { + Objects.requireNonNull(host, "host cannot be null"); + connections.add(new Connection(host)); + } + this.connections = Collections.unmodifiableList(connections); } public ElasticsearchResponse performRequest(String method, String endpoint, Map params, HttpEntity entity) throws IOException { URI uri = buildUri(endpoint, params); HttpRequestBase request = createHttpRequest(method, uri, entity); - return performRequest(request, connectionPool.nextConnection()); - } - - private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator connectionIterator) throws IOException { //we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98); IOException lastSeenException = null; long startTime = System.nanoTime(); - + Iterator connectionIterator = nextConnection(); while (connectionIterator.hasNext()) { Connection connection = connectionIterator.next(); @@ -83,6 +96,7 @@ public final class RestClient implements Closeable { retryTimeoutException.addSuppressed(lastSeenException); throw retryTimeoutException; } + request.reset(); } CloseableHttpResponse response; @@ -90,17 +104,15 @@ public final class RestClient implements Closeable { response = client.execute(connection.getHost(), request); } catch(IOException e) { RequestLogger.log(logger, "request failed", request, connection.getHost(), e); - connectionPool.onFailure(connection); + onFailure(connection); lastSeenException = addSuppressedException(lastSeenException, e); continue; - } finally { - request.reset(); } int statusCode = response.getStatusLine().getStatusCode(); //TODO make ignore status code configurable. rest-spec and tests support that parameter (ignore_missing) if (statusCode < 300 || (request.getMethod().equals(HttpHead.METHOD_NAME) && statusCode == 404) ) { RequestLogger.log(logger, "request succeeded", request, connection.getHost(), response); - connectionPool.onSuccess(connection); + onSuccess(connection); return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response); } else { RequestLogger.log(logger, "request failed", request, connection.getHost(), response); @@ -113,10 +125,10 @@ public final class RestClient implements Closeable { lastSeenException = addSuppressedException(lastSeenException, elasticsearchResponseException); //clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places if (statusCode == 502 || statusCode == 503 || statusCode == 504) { - connectionPool.onFailure(connection); + onFailure(connection); } else { //don't retry and call onSuccess as the error should be a request problem, the node is alive - connectionPool.onSuccess(connection); + onSuccess(connection); break; } } @@ -125,6 +137,74 @@ public final class RestClient implements Closeable { throw lastSeenException; } + /** + * Returns an iterator of connections that should be used for a request call. + * Ideally, the first connection is retrieved from the iterator and used successfully for the request. + * Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried. + * The maximum total of attempts is equal to the number of connections that are available in the iterator. + * The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case. + * In case there are no alive connections available, or dead ones that should be retried, one dead connection + * gets resurrected and returned. + */ + private Iterator nextConnection() { + if (this.connections.isEmpty()) { + throw new IllegalStateException("no connections available in the connection pool"); + } + + List rotatedConnections = new ArrayList<>(connections); + //TODO is it possible to make this O(1)? (rotate is O(n)) + Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement()); + Iterator connectionIterator = rotatedConnections.iterator(); + while (connectionIterator.hasNext()) { + Connection connection = connectionIterator.next(); + if (connection.isAlive() == false && connection.shouldBeRetried() == false) { + connectionIterator.remove(); + } + } + if (rotatedConnections.isEmpty()) { + List sortedConnections = new ArrayList<>(connections); + Collections.sort(sortedConnections, new Comparator() { + @Override + public int compare(Connection o1, Connection o2) { + return Long.compare(o1.getDeadUntil(), o2.getDeadUntil()); + } + }); + Connection connection = sortedConnections.get(0); + connection.markResurrected(); + logger.trace("marked connection resurrected for " + connection.getHost()); + return Collections.singleton(connection).iterator(); + } + return rotatedConnections.iterator(); + } + + /** + * Called after each successful request call. + * Receives as an argument the connection that was used for the successful request. + */ + public void onSuccess(Connection connection) { + connection.markAlive(); + logger.trace("marked connection alive for " + connection.getHost()); + } + + /** + * Called after each failed attempt. + * Receives as an argument the connection that was used for the failed attempt. + */ + private void onFailure(Connection connection) throws IOException { + connection.markDead(); + logger.debug("marked connection dead for " + connection.getHost()); + failureListener.onFailure(connection); + } + + public synchronized void setFailureListener(FailureListener failureListener) { + this.failureListener = failureListener; + } + + @Override + public void close() throws IOException { + client.close(); + } + private static IOException addSuppressedException(IOException suppressedException, IOException currentException) { if (suppressedException != null) { currentException.addSuppressed(suppressedException); @@ -178,12 +258,6 @@ public final class RestClient implements Closeable { } } - @Override - public void close() throws IOException { - connectionPool.close(); - client.close(); - } - /** * Returns a new {@link Builder} to help with {@link RestClient} creation. */ @@ -195,9 +269,11 @@ public final class RestClient implements Closeable { * Rest client builder. Helps creating a new {@link RestClient}. */ public static final class Builder { - private static final int DEFAULT_MAX_RETRY_TIMEOUT = 10000; + public static final int DEFAULT_CONNECT_TIMEOUT = 500; + public static final int DEFAULT_SOCKET_TIMEOUT = 5000; + public static final int DEFAULT_MAX_RETRY_TIMEOUT = DEFAULT_SOCKET_TIMEOUT; + public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 500; - private ConnectionPool connectionPool; private CloseableHttpClient httpClient; private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT; private HttpHost[] hosts; @@ -206,17 +282,9 @@ public final class RestClient implements Closeable { } - /** - * Sets the connection pool. {@link StaticConnectionPool} will be used if not specified. - * @see ConnectionPool - */ - public Builder setConnectionPool(ConnectionPool connectionPool) { - this.connectionPool = connectionPool; - return this; - } - /** * Sets the http client. A new default one will be created if not specified, by calling {@link #createDefaultHttpClient()}. + * * @see CloseableHttpClient */ public Builder setHttpClient(CloseableHttpClient httpClient) { @@ -227,6 +295,7 @@ public final class RestClient implements Closeable { /** * Sets the maximum timeout to honour in case of multiple retries of the same request. * {@link #DEFAULT_MAX_RETRY_TIMEOUT} if not specified. + * * @throws IllegalArgumentException if maxRetryTimeout is not greater than 0 */ public Builder setMaxRetryTimeout(int maxRetryTimeout) { @@ -256,10 +325,7 @@ public final class RestClient implements Closeable { if (httpClient == null) { httpClient = createDefaultHttpClient(); } - if (connectionPool == null) { - connectionPool = new StaticConnectionPool(hosts); - } - return new RestClient(httpClient, connectionPool, maxRetryTimeout); + return new RestClient(httpClient, maxRetryTimeout, hosts); } /** @@ -274,10 +340,21 @@ public final class RestClient implements Closeable { connectionManager.setMaxTotal(30); //default timeouts are all infinite - RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(500).setSocketTimeout(10000) - .setConnectionRequestTimeout(500).build(); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(DEFAULT_CONNECT_TIMEOUT) + .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT) + .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT).build(); return HttpClientBuilder.create().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig).build(); } } + + /** + * Listener that allows to be notified whenever a failure happens. Useful when sniffing is enabled, so that we can sniff on failure. + * The default implementation is a no-op. + */ + public static class FailureListener { + public void onFailure(Connection connection) throws IOException { + + } + } } diff --git a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java deleted file mode 100644 index 4ec0bcb3c39..00000000000 --- a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.http.HttpHost; - -import java.io.IOException; -import java.util.List; - -/** - * Static implementation of {@link ConnectionPool}. Its underlying list of connections is immutable. - */ -public class StaticConnectionPool extends ConnectionPool { - - private final List connections; - - public StaticConnectionPool(HttpHost... hosts) { - if (hosts == null || hosts.length == 0) { - throw new IllegalArgumentException("no hosts provided"); - } - this.connections = createConnections(hosts); - } - - @Override - protected List getConnections() { - return connections; - } - - @Override - public void close() throws IOException { - //no-op nothing to close - } -} diff --git a/client/src/main/java/org/elasticsearch/client/sniff/HostsSniffer.java b/client/src/main/java/org/elasticsearch/client/sniff/HostsSniffer.java new file mode 100644 index 00000000000..29117fcd856 --- /dev/null +++ b/client/src/main/java/org/elasticsearch/client/sniff/HostsSniffer.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.elasticsearch.client.ElasticsearchResponse; +import org.elasticsearch.client.RestClient; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Class responsible for sniffing the http hosts from elasticsearch through the nodes info api and returning them back + */ +public class HostsSniffer { + + private static final Log logger = LogFactory.getLog(HostsSniffer.class); + + private final RestClient restClient; + private final Map sniffRequestParams; + private final String scheme; + private final JsonFactory jsonFactory; + + public HostsSniffer(RestClient restClient, int sniffRequestTimeout, String scheme) { + this.restClient = restClient; + this.sniffRequestParams = Collections.singletonMap("timeout", sniffRequestTimeout + "ms"); + this.scheme = scheme; + this.jsonFactory = new JsonFactory(); + } + + /** + * Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts + */ + public List sniffHosts() throws IOException { + try (ElasticsearchResponse response = restClient.performRequest("get", "/_nodes/http", sniffRequestParams, null)) { + return readHosts(response.getEntity()); + } + } + + private List readHosts(HttpEntity entity) throws IOException { + try (InputStream inputStream = entity.getContent()) { + JsonParser parser = jsonFactory.createParser(inputStream); + if (parser.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("expected data to start with an object"); + } + List hosts = new ArrayList<>(); + while (parser.nextToken() != JsonToken.END_OBJECT) { + if (parser.getCurrentToken() == JsonToken.START_OBJECT) { + if ("nodes".equals(parser.getCurrentName())) { + while (parser.nextToken() != JsonToken.END_OBJECT) { + JsonToken token = parser.nextToken(); + assert token == JsonToken.START_OBJECT; + String nodeId = parser.getCurrentName(); + HttpHost sniffedHost = readHost(nodeId, parser, this.scheme); + if (sniffedHost != null) { + logger.trace("adding node [" + nodeId + "]"); + hosts.add(sniffedHost); + } + } + } else { + parser.skipChildren(); + } + } + } + return hosts; + } + } + + private static HttpHost readHost(String nodeId, JsonParser parser, String scheme) throws IOException { + HttpHost httpHost = null; + String fieldName = null; + while (parser.nextToken() != JsonToken.END_OBJECT) { + if (parser.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = parser.getCurrentName(); + } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) { + if ("http".equals(fieldName)) { + while (parser.nextToken() != JsonToken.END_OBJECT) { + if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) { + URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString()); + httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(), + boundAddressAsURI.getScheme()); + } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) { + parser.skipChildren(); + } + } + } else { + parser.skipChildren(); + } + } + } + //http section is not present if http is not enabled on the node, ignore such nodes + if (httpHost == null) { + logger.debug("skipping node [" + nodeId + "] with http disabled"); + return null; + } + return httpHost; + } +} diff --git a/client/src/main/java/org/elasticsearch/client/sniff/Sniffer.java b/client/src/main/java/org/elasticsearch/client/sniff/Sniffer.java index 09a72fedb62..43ad143738a 100644 --- a/client/src/main/java/org/elasticsearch/client/sniff/Sniffer.java +++ b/client/src/main/java/org/elasticsearch/client/sniff/Sniffer.java @@ -19,130 +19,228 @@ package org.elasticsearch.client.sniff; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; -import org.elasticsearch.client.ElasticsearchResponseException; -import org.elasticsearch.client.RequestLogger; +import org.elasticsearch.client.Connection; +import org.elasticsearch.client.RestClient; +import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Calls nodes info api and returns a list of http hosts extracted from it. */ //TODO This could potentially be using _cat/nodes which wouldn't require jackson as a dependency, but we'd have bw comp problems with 2.x -final class Sniffer { +public final class Sniffer extends RestClient.FailureListener implements Closeable { private static final Log logger = LogFactory.getLog(Sniffer.class); - private final CloseableHttpClient client; - private final RequestConfig sniffRequestConfig; - private final int sniffRequestTimeout; - private final String scheme; - private final JsonFactory jsonFactory; + private final boolean sniffOnFailure; + private final Task task; - Sniffer(CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, String scheme) { - this.client = client; - this.sniffRequestConfig = sniffRequestConfig; - this.sniffRequestTimeout = sniffRequestTimeout; - this.scheme = scheme; - this.jsonFactory = new JsonFactory(); + public Sniffer(RestClient restClient, int sniffRequestTimeout, String scheme, int sniffInterval, + boolean sniffOnFailure, int sniffAfterFailureDelay) { + HostsSniffer hostsSniffer = new HostsSniffer(restClient, sniffRequestTimeout, scheme); + this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); + this.sniffOnFailure = sniffOnFailure; + restClient.setFailureListener(this); } - List sniffNodes(HttpHost host) throws IOException { - HttpGet httpGet = new HttpGet("/_nodes/http?timeout=" + sniffRequestTimeout + "ms"); - httpGet.setConfig(sniffRequestConfig); - - try (CloseableHttpResponse response = client.execute(host, httpGet)) { - StatusLine statusLine = response.getStatusLine(); - if (statusLine.getStatusCode() >= 300) { - RequestLogger.log(logger, "sniff failed", httpGet, host, response); - String responseBody = null; - if (response.getEntity() != null) { - responseBody = EntityUtils.toString(response.getEntity()); - } - throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, response.getStatusLine(), responseBody); - } else { - List nodes = readHosts(response.getEntity()); - RequestLogger.log(logger, "sniff succeeded", httpGet, host, response); - return nodes; - } - } catch(IOException e) { - RequestLogger.log(logger, "sniff failed", httpGet, host, e); - throw e; + @Override + public void onFailure(Connection connection) throws IOException { + if (sniffOnFailure) { + //re-sniff immediately but take out the node that failed + task.sniffOnFailure(connection.getHost()); } } - private List readHosts(HttpEntity entity) throws IOException { - try (InputStream inputStream = entity.getContent()) { - JsonParser parser = jsonFactory.createParser(inputStream); - if (parser.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("expected data to start with an object"); - } - List hosts = new ArrayList<>(); - while (parser.nextToken() != JsonToken.END_OBJECT) { - if (parser.getCurrentToken() == JsonToken.START_OBJECT) { - if ("nodes".equals(parser.getCurrentName())) { - while (parser.nextToken() != JsonToken.END_OBJECT) { - JsonToken token = parser.nextToken(); - assert token == JsonToken.START_OBJECT; - String nodeId = parser.getCurrentName(); - HttpHost sniffedHost = readNode(nodeId, parser, this.scheme); - if (sniffedHost != null) { - logger.trace("adding node [" + nodeId + "]"); - hosts.add(sniffedHost); - } - } - } else { - parser.skipChildren(); + @Override + public void close() throws IOException { + task.shutdown(); + } + + private static class Task implements Runnable { + private final HostsSniffer hostsSniffer; + private final RestClient restClient; + + private final int sniffInterval; + private final int sniffAfterFailureDelay; + private final ScheduledExecutorService scheduledExecutorService; + private final AtomicBoolean running = new AtomicBoolean(false); + private volatile int nextSniffDelay; + private volatile ScheduledFuture scheduledFuture; + + private Task(HostsSniffer hostsSniffer, RestClient restClient, int sniffInterval, int sniffAfterFailureDelay) { + this.hostsSniffer = hostsSniffer; + this.restClient = restClient; + this.sniffInterval = sniffInterval; + this.sniffAfterFailureDelay = sniffAfterFailureDelay; + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + this.scheduledFuture = this.scheduledExecutorService.schedule(this, 0, TimeUnit.MILLISECONDS); + this.nextSniffDelay = sniffInterval; + } + + @Override + public void run() { + sniff(null); + } + + void sniffOnFailure(HttpHost failedHost) { + this.nextSniffDelay = sniffAfterFailureDelay; + sniff(failedHost); + } + + void sniff(HttpHost excludeHost) { + if (running.compareAndSet(false, true)) { + try { + List sniffedNodes = hostsSniffer.sniffHosts(); + if (excludeHost != null) { + sniffedNodes.remove(excludeHost); + } + logger.debug("sniffed nodes: " + sniffedNodes); + this.restClient.setNodes(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()])); + } catch (Throwable t) { + logger.error("error while sniffing nodes", t); + } finally { + try { + //regardless of whether and when the next sniff is scheduled, cancel it and schedule a new one with updated delay + this.scheduledFuture.cancel(false); + logger.debug("scheduling next sniff in " + nextSniffDelay + " ms"); + this.scheduledFuture = this.scheduledExecutorService.schedule(this, nextSniffDelay, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + logger.error("error while scheduling next sniffer task", t); + } finally { + this.nextSniffDelay = sniffInterval; + running.set(false); } } } - return hosts; + } + + void shutdown() { + scheduledExecutorService.shutdown(); + try { + if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + scheduledExecutorService.shutdownNow(); } } - private static HttpHost readNode(String nodeId, JsonParser parser, String scheme) throws IOException { - HttpHost httpHost = null; - String fieldName = null; - while (parser.nextToken() != JsonToken.END_OBJECT) { - if (parser.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = parser.getCurrentName(); - } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) { - if ("http".equals(fieldName)) { - while (parser.nextToken() != JsonToken.END_OBJECT) { - if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) { - URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString()); - httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(), - boundAddressAsURI.getScheme()); - } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) { - parser.skipChildren(); - } - } - } else { - parser.skipChildren(); - } + /** + * Returns a new {@link Builder} to help with {@link Sniffer} creation. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Sniffer builder. Helps creating a new {@link Sniffer}. + */ + public static final class Builder { + public static final int DEFAULT_SNIFF_INTERVAL = 60000 * 5; //5 minutes + public static final int DEFAULT_SNIFF_AFTER_FAILURE_DELAY = 60000; //1 minute + public static final int DEFAULT_SNIFF_REQUEST_TIMEOUT = 1000; //1 second + + private int sniffRequestTimeout = DEFAULT_SNIFF_REQUEST_TIMEOUT; + private int sniffInterval = DEFAULT_SNIFF_INTERVAL; + private boolean sniffOnFailure = true; + private int sniffAfterFailureDelay = DEFAULT_SNIFF_AFTER_FAILURE_DELAY; + private String scheme = "http"; + private RestClient restClient; + + private Builder() { + + } + + /** + * Sets the interval between consecutive ordinary sniff executions. Will be honoured when sniffOnFailure is disabled or + * when there are no failures between consecutive sniff executions. + * @throws IllegalArgumentException if sniffInterval is not greater than 0 + */ + public Builder setSniffInterval(int sniffInterval) { + if (sniffInterval <= 0) { + throw new IllegalArgumentException("sniffInterval must be greater than 0"); } + this.sniffInterval = sniffInterval; + return this; } - //http section is not present if http is not enabled on the node, ignore such nodes - if (httpHost == null) { - logger.debug("skipping node [" + nodeId + "] with http disabled"); - return null; + + /** + * Enables/disables sniffing on failure. If enabled, at each failure nodes will be reloaded, and a new sniff execution will + * be scheduled after a shorter time than usual (sniffAfterFailureDelay). + */ + public Builder setSniffOnFailure(boolean sniffOnFailure) { + this.sniffOnFailure = sniffOnFailure; + return this; + } + + /** + * Sets the delay of a sniff execution scheduled after a failure. + */ + public Builder setSniffAfterFailureDelay(int sniffAfterFailureDelay) { + if (sniffAfterFailureDelay <= 0) { + throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0"); + } + this.sniffAfterFailureDelay = sniffAfterFailureDelay; + return this; + } + + /** + * Sets the http client. Mandatory argument. Best practice is to use the same client used + * within {@link org.elasticsearch.client.RestClient} which can be created manually or + * through {@link RestClient.Builder#createDefaultHttpClient()}. + * @see CloseableHttpClient + */ + public Builder setRestClient(RestClient restClient) { + this.restClient = restClient; + return this; + } + + /** + * Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch. + * Allows to halt the request without any failure, as only the nodes that have responded + * within this timeout will be returned. + */ + public Builder setSniffRequestTimeout(int sniffRequestTimeout) { + if (sniffRequestTimeout <=0) { + throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0"); + } + this.sniffRequestTimeout = sniffRequestTimeout; + return this; + } + + /** + * Sets the scheme to be used for sniffed nodes. This information is not returned by elasticsearch, + * default is http but should be customized if https is needed/enabled. + */ + public Builder setScheme(String scheme) { + Objects.requireNonNull(scheme, "scheme cannot be null"); + if (scheme.equals("http") == false && scheme.equals("https") == false) { + throw new IllegalArgumentException("scheme must be either http or https"); + } + this.scheme = scheme; + return this; + } + + /** + * Creates the {@link Sniffer} based on the provided configuration. + */ + public Sniffer build() { + Objects.requireNonNull(restClient, "restClient cannot be null"); + return new Sniffer(restClient, sniffRequestTimeout, scheme, sniffInterval, sniffOnFailure, sniffAfterFailureDelay); } - return httpHost; } } diff --git a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java b/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java deleted file mode 100644 index 6fd98666e8c..00000000000 --- a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.sniff; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpHost; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.elasticsearch.client.Connection; -import org.elasticsearch.client.ConnectionPool; -import org.elasticsearch.client.RestClient; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Connection pool implementation that sniffs nodes from elasticsearch at regular intervals. - * Can optionally sniff nodes on each failure as well. - */ -public class SniffingConnectionPool extends ConnectionPool { - - private static final Log logger = LogFactory.getLog(SniffingConnectionPool.class); - - private final boolean sniffOnFailure; - private final Sniffer sniffer; - private volatile List connections; - private final SnifferTask snifferTask; - - private SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay, CloseableHttpClient client, - RequestConfig sniffRequestConfig, int sniffRequestTimeout, String scheme, HttpHost... hosts) { - this.sniffOnFailure = sniffOnFailure; - this.sniffer = new Sniffer(client, sniffRequestConfig, sniffRequestTimeout, scheme); - this.connections = createConnections(hosts); - this.snifferTask = new SnifferTask(sniffInterval, sniffAfterFailureDelay); - } - - @Override - protected List getConnections() { - return this.connections; - } - - @Override - public void onFailure(Connection connection) throws IOException { - super.onFailure(connection); - if (sniffOnFailure) { - //re-sniff immediately but take out the node that failed - snifferTask.sniffOnFailure(connection.getHost()); - } - } - - @Override - public void close() throws IOException { - snifferTask.shutdown(); - } - - private class SnifferTask implements Runnable { - private final int sniffInterval; - private final int sniffAfterFailureDelay; - private final ScheduledExecutorService scheduledExecutorService; - private final AtomicBoolean running = new AtomicBoolean(false); - private volatile boolean failure = false; - private volatile ScheduledFuture scheduledFuture; - - private SnifferTask(int sniffInterval, int sniffAfterFailureDelay) { - this.sniffInterval = sniffInterval; - this.sniffAfterFailureDelay = sniffAfterFailureDelay; - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); - this.scheduledFuture = this.scheduledExecutorService.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void run() { - sniff(null); - } - - void sniffOnFailure(HttpHost failedHost) { - //sync sniff straightaway on failure - failure = true; - sniff(failedHost); - } - - void sniff(HttpHost excludeHost) { - if (running.compareAndSet(false, true)) { - try { - sniff(nextConnection(), excludeHost); - } catch (Throwable t) { - logger.error("error while sniffing nodes", t); - } finally { - try { - //regardless of whether and when the next sniff is scheduled, cancel it and schedule a new one with updated delay - this.scheduledFuture.cancel(false); - if (this.failure) { - this.scheduledFuture = this.scheduledExecutorService.schedule(this, - sniffAfterFailureDelay, TimeUnit.MILLISECONDS); - this.failure = false; - } else { - this.scheduledFuture = this.scheduledExecutorService.schedule(this, sniffInterval, TimeUnit.MILLISECONDS); - } - } catch (Throwable t) { - logger.error("error while scheduling next sniffer task", t); - } finally { - running.set(false); - } - } - } - } - - void sniff(Iterator connectionIterator, HttpHost excludeHost) throws IOException { - IOException lastSeenException = null; - while (connectionIterator.hasNext()) { - Connection connection = connectionIterator.next(); - try { - List sniffedNodes = sniffer.sniffNodes(connection.getHost()); - if (excludeHost != null) { - sniffedNodes.remove(excludeHost); - } - connections = createConnections(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()])); - onSuccess(connection); - return; - } catch (IOException e) { - //here we have control over the request, if it fails something is really wrong, always call onFailure - onFailure(connection); - if (lastSeenException != null) { - e.addSuppressed(lastSeenException); - } - lastSeenException = e; - } - } - logger.warn("failed to sniff nodes", lastSeenException); - } - - void shutdown() { - scheduledExecutorService.shutdown(); - try { - if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) { - return; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - scheduledExecutorService.shutdownNow(); - } - } - - /** - * Returns a new {@link Builder} to help with {@link SniffingConnectionPool} creation. - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Sniffing connection pool builder. Helps creating a new {@link SniffingConnectionPool}. - */ - public static final class Builder { - private int sniffInterval = 5 * 1000 * 60; - private boolean sniffOnFailure = true; - private int sniffAfterFailureDelay = 60000; - private CloseableHttpClient httpClient; - private RequestConfig sniffRequestConfig; - private int sniffRequestTimeout = 1000; - private String scheme = "http"; - private HttpHost[] hosts; - - private Builder() { - - } - - /** - * Sets the interval between consecutive ordinary sniff executions. Will be honoured when sniffOnFailure is disabled or - * when there are no failures between consecutive sniff executions. - * @throws IllegalArgumentException if sniffInterval is not greater than 0 - */ - public Builder setSniffInterval(int sniffInterval) { - if (sniffInterval <= 0) { - throw new IllegalArgumentException("sniffInterval must be greater than 0"); - } - this.sniffInterval = sniffInterval; - return this; - } - - /** - * Enables/disables sniffing on failure. If enabled, at each failure nodes will be reloaded, and a new sniff execution will - * be scheduled after a shorter time than usual (sniffAfterFailureDelay). - */ - public Builder setSniffOnFailure(boolean sniffOnFailure) { - this.sniffOnFailure = sniffOnFailure; - return this; - } - - /** - * Sets the delay of a sniff execution scheduled after a failure. - */ - public Builder setSniffAfterFailureDelay(int sniffAfterFailureDelay) { - if (sniffAfterFailureDelay <= 0) { - throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0"); - } - this.sniffAfterFailureDelay = sniffAfterFailureDelay; - return this; - } - - /** - * Sets the http client. Mandatory argument. Best practice is to use the same client used - * within {@link org.elasticsearch.client.RestClient} which can be created manually or - * through {@link RestClient.Builder#createDefaultHttpClient()}. - * @see CloseableHttpClient - */ - public Builder setHttpClient(CloseableHttpClient httpClient) { - this.httpClient = httpClient; - return this; - } - - /** - * Sets the configuration to be used for each sniff request. Useful as sniff can have - * different timeouts compared to ordinary requests. - * @see RequestConfig - */ - public Builder setSniffRequestConfig(RequestConfig sniffRequestConfig) { - this.sniffRequestConfig = sniffRequestConfig; - return this; - } - - /** - * Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch. - * Allows to halt the request without any failure, as only the nodes that have responded - * within this timeout will be returned. - */ - public Builder setSniffRequestTimeout(int sniffRequestTimeout) { - if (sniffRequestTimeout <=0) { - throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0"); - } - this.sniffRequestTimeout = sniffRequestTimeout; - return this; - } - - /** - * Sets the scheme to be used for sniffed nodes. This information is not returned by elasticsearch, - * default is http but should be customized if https is needed/enabled. - */ - public Builder setScheme(String scheme) { - Objects.requireNonNull(scheme, "scheme cannot be null"); - if (scheme.equals("http") == false && scheme.equals("https") == false) { - throw new IllegalArgumentException("scheme must be either http or https"); - } - this.scheme = scheme; - return this; - } - - /** - * Sets the hosts that the client will send requests to. - */ - public Builder setHosts(HttpHost... hosts) { - this.hosts = hosts; - return this; - } - - /** - * Creates the {@link SniffingConnectionPool} based on the provided configuration. - */ - public SniffingConnectionPool build() { - Objects.requireNonNull(httpClient, "httpClient cannot be null"); - if (hosts == null || hosts.length == 0) { - throw new IllegalArgumentException("no hosts provided"); - } - - if (sniffRequestConfig == null) { - sniffRequestConfig = RequestConfig.custom().setConnectTimeout(500).setSocketTimeout(1000) - .setConnectionRequestTimeout(500).build(); - } - return new SniffingConnectionPool(sniffInterval, sniffOnFailure, sniffAfterFailureDelay, httpClient, sniffRequestConfig, - sniffRequestTimeout, scheme, hosts); - } - } -} diff --git a/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java b/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java index 11008d52b19..e0828f2bd1f 100644 --- a/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java @@ -25,8 +25,6 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.lucene.util.LuceneTestCase; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.logging.LogManager; public class RestClientBuilderTests extends LuceneTestCase { @@ -57,38 +55,23 @@ public class RestClientBuilderTests extends LuceneTestCase { assertEquals(e.getMessage(), "no hosts provided"); } - RestClient.Builder builder = RestClient.builder(); - if (random().nextBoolean()) { - ConnectionPool connectionPool = new ConnectionPool() { - @Override - protected List getConnections() { - return Collections.emptyList(); - } - - @Override - public void onSuccess(Connection connection) { - - } - - @Override - public void onFailure(Connection connection) throws IOException { - - } - - @Override - public void close() throws IOException { - - } - }; - builder.setConnectionPool(connectionPool); - } else { - int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - HttpHost[] hosts = new HttpHost[numNodes]; - for (int i = 0; i < numNodes; i++) { - hosts[i] = new HttpHost("localhost", 9200 + i); - } - builder.setHosts(hosts); + try { + RestClient.builder(); + fail("should have failed"); + } catch(IllegalArgumentException e) { + assertEquals(e.getMessage(), "no hosts provided"); } + + RestClient.Builder builder = RestClient.builder(); + int numNodes = RandomInts.randomIntBetween(random(), 1, 5); + HttpHost[] hosts = new HttpHost[numNodes]; + for (int i = 0; i < numNodes; i++) { + hosts[i] = new HttpHost("localhost", 9200 + i); + } + builder.setHosts(hosts); + + //TODO test one host is null among others + if (random().nextBoolean()) { builder.setHttpClient(HttpClientBuilder.create().build()); } diff --git a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java b/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java deleted file mode 100644 index c777666e98b..00000000000 --- a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import com.carrotsearch.randomizedtesting.generators.RandomInts; -import org.apache.http.HttpHost; -import org.apache.lucene.util.LuceneTestCase; - -import java.util.logging.LogManager; - -public class StaticConnectionPoolTests extends LuceneTestCase { - - static { - LogManager.getLogManager().reset(); - } - - public void testConstructor() { - int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - HttpHost[] hosts = new HttpHost[numNodes]; - for (int i = 0; i < numNodes; i++) { - hosts[i] = new HttpHost("localhost", 9200); - } - - try { - new StaticConnectionPool((HttpHost) null); - } catch(NullPointerException e) { - assertEquals(e.getMessage(), "host cannot be null"); - } - - try { - new StaticConnectionPool((HttpHost[])null); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no hosts provided"); - } - - try { - new StaticConnectionPool(); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no hosts provided"); - } - - StaticConnectionPool staticConnectionPool = new StaticConnectionPool(hosts); - assertNotNull(staticConnectionPool); - } -} diff --git a/client/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java b/client/src/test/java/org/elasticsearch/client/sniff/HostsSnifferTests.java similarity index 84% rename from client/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java rename to client/src/test/java/org/elasticsearch/client/sniff/HostsSnifferTests.java index 2fa9f7b3225..c9e1b48a791 100644 --- a/client/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java +++ b/client/src/test/java/org/elasticsearch/client/sniff/HostsSnifferTests.java @@ -29,11 +29,9 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.apache.http.HttpHost; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.ElasticsearchResponseException; +import org.elasticsearch.client.RestClient; import org.junit.After; import org.junit.Before; @@ -57,7 +55,7 @@ import java.util.logging.LogManager; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -public class SnifferTests extends LuceneTestCase { +public class HostsSnifferTests extends LuceneTestCase { static { //prevent MockWebServer from logging to stdout and stderr @@ -88,34 +86,36 @@ public class SnifferTests extends LuceneTestCase { } public void testSniffNodes() throws IOException, URISyntaxException { - CloseableHttpClient client = HttpClientBuilder.create().build(); - Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme); HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort()); - try { - List sniffedHosts = sniffer.sniffNodes(httpHost); - if (sniffResponse.isFailure) { - fail("sniffNodes should have failed"); - } - assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size())); - Iterator responseHostsIterator = sniffResponse.hosts.iterator(); - for (HttpHost sniffedHost : sniffedHosts) { - assertEquals(sniffedHost, responseHostsIterator.next()); - } - } catch(ElasticsearchResponseException e) { - if (sniffResponse.isFailure) { - assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() + - "/_nodes/http?timeout=" + sniffRequestTimeout)); - assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode))); - assertThat(e.getHost(), equalTo(httpHost)); - assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode)); - assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1")); - } else { - fail("sniffNodes should have succeeded: " + e.getStatusLine()); + try (RestClient restClient = RestClient.builder().setHosts(httpHost).build()) { + HostsSniffer sniffer = new HostsSniffer(restClient, sniffRequestTimeout, scheme); + try { + List sniffedHosts = sniffer.sniffHosts(); + if (sniffResponse.isFailure) { + fail("sniffNodes should have failed"); + } + assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size())); + Iterator responseHostsIterator = sniffResponse.hosts.iterator(); + for (HttpHost sniffedHost : sniffedHosts) { + assertEquals(sniffedHost, responseHostsIterator.next()); + } + } catch(ElasticsearchResponseException e) { + if (sniffResponse.isFailure) { + assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() + + "/_nodes/http?timeout=" + sniffRequestTimeout)); + assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode))); + assertThat(e.getHost(), equalTo(httpHost)); + assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode)); + assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1")); + } else { + fail("sniffNodes should have succeeded: " + e.getStatusLine()); + } } } } - private static MockWebServer buildMockWebServer(final SniffResponse sniffResponse, final int sniffTimeout) throws UnsupportedEncodingException { + private static MockWebServer buildMockWebServer(final SniffResponse sniffResponse, final int sniffTimeout) + throws UnsupportedEncodingException { MockWebServer server = new MockWebServer(); final Dispatcher dispatcher = new Dispatcher() { @Override diff --git a/client/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java b/client/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java new file mode 100644 index 00000000000..6373fae00d5 --- /dev/null +++ b/client/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import com.carrotsearch.randomizedtesting.generators.RandomInts; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.http.HttpHost; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.client.RestClient; + +import java.util.Arrays; +import java.util.logging.LogManager; + +public class SnifferBuilderTests extends LuceneTestCase { + + static { + LogManager.getLogManager().reset(); + } + + public void testBuild() throws Exception { + + try { + Sniffer.builder().setScheme(null); + fail("should have failed"); + } catch(NullPointerException e) { + assertEquals(e.getMessage(), "scheme cannot be null"); + } + + try { + Sniffer.builder().setScheme("whatever"); + fail("should have failed"); + } catch(IllegalArgumentException e) { + assertEquals(e.getMessage(), "scheme must be either http or https"); + } + + try { + Sniffer.builder().setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); + fail("should have failed"); + } catch(IllegalArgumentException e) { + assertEquals(e.getMessage(), "sniffInterval must be greater than 0"); + } + + try { + Sniffer.builder().setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); + fail("should have failed"); + } catch(IllegalArgumentException e) { + assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0"); + } + + try { + Sniffer.builder().setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); + fail("should have failed"); + } catch(IllegalArgumentException e) { + assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0"); + } + + try { + Sniffer.builder().build(); + fail("should have failed"); + } catch(NullPointerException e) { + assertEquals(e.getMessage(), "restClient cannot be null"); + } + + int numNodes = RandomInts.randomIntBetween(random(), 1, 5); + HttpHost[] hosts = new HttpHost[numNodes]; + for (int i = 0; i < numNodes; i++) { + hosts[i] = new HttpHost("localhost", 9200 + i); + } + + try (RestClient client = RestClient.builder().setHosts(hosts).build()) { + try (Sniffer sniffer = Sniffer.builder().setRestClient(client).build()) { + assertNotNull(sniffer); + } + } + + try (RestClient client = RestClient.builder().setHosts(hosts).build()) { + Sniffer.Builder builder = Sniffer.builder().setRestClient(client); + if (random().nextBoolean()) { + builder.setScheme(RandomPicks.randomFrom(random(), Arrays.asList("http", "https"))); + } + if (random().nextBoolean()) { + builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); + } + if (random().nextBoolean()) { + builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); + } + if (random().nextBoolean()) { + builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); + } + if (random().nextBoolean()) { + builder.setSniffOnFailure(random().nextBoolean()); + } + try (Sniffer connectionPool = builder.build()) { + assertNotNull(connectionPool); + } + } + } +} diff --git a/client/src/test/java/org/elasticsearch/client/sniff/SniffingConnectionPoolBuilderTests.java b/client/src/test/java/org/elasticsearch/client/sniff/SniffingConnectionPoolBuilderTests.java deleted file mode 100644 index b5b4eaa3ce1..00000000000 --- a/client/src/test/java/org/elasticsearch/client/sniff/SniffingConnectionPoolBuilderTests.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.sniff; - -import com.carrotsearch.randomizedtesting.generators.RandomInts; -import com.carrotsearch.randomizedtesting.generators.RandomPicks; -import org.apache.http.HttpHost; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.lucene.util.LuceneTestCase; - -import java.util.Arrays; -import java.util.logging.LogManager; - -public class SniffingConnectionPoolBuilderTests extends LuceneTestCase { - - static { - LogManager.getLogManager().reset(); - } - - public void testBuild() throws Exception { - - try { - SniffingConnectionPool.builder().setScheme(null); - fail("should have failed"); - } catch(NullPointerException e) { - assertEquals(e.getMessage(), "scheme cannot be null"); - } - - try { - SniffingConnectionPool.builder().setScheme("whatever"); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "scheme must be either http or https"); - } - - try { - SniffingConnectionPool.builder().setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "sniffInterval must be greater than 0"); - } - - try { - SniffingConnectionPool.builder().setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0"); - } - - try { - SniffingConnectionPool.builder().setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0"); - } - - try { - SniffingConnectionPool.builder().build(); - fail("should have failed"); - } catch(NullPointerException e) { - assertEquals(e.getMessage(), "httpClient cannot be null"); - } - - try { - SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).build(); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no hosts provided"); - } - - try { - SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).setHosts((HttpHost[])null).build(); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no hosts provided"); - } - - try { - SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).setHosts().build(); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no hosts provided"); - } - - int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - HttpHost[] hosts = new HttpHost[numNodes]; - for (int i = 0; i < numNodes; i++) { - hosts[i] = new HttpHost("localhost", 9200 + i); - } - - try (SniffingConnectionPool connectionPool = SniffingConnectionPool.builder() - .setHttpClient(HttpClientBuilder.create().build()).setHosts(hosts).build()) { - assertNotNull(connectionPool); - } - - SniffingConnectionPool.Builder builder = SniffingConnectionPool.builder() - .setHttpClient(HttpClientBuilder.create().build()).setHosts(hosts); - if (random().nextBoolean()) { - builder.setScheme(RandomPicks.randomFrom(random(), Arrays.asList("http", "https"))); - } - if (random().nextBoolean()) { - builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); - } - if (random().nextBoolean()) { - builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); - } - if (random().nextBoolean()) { - builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); - } - if (random().nextBoolean()) { - builder.setSniffOnFailure(random().nextBoolean()); - } - if (random().nextBoolean()) { - builder.setSniffRequestConfig(RequestConfig.DEFAULT); - } - try (SniffingConnectionPool connectionPool = builder.build()) { - assertNotNull(connectionPool); - } - } -}