remove notion of connection pool, turn around dependency between RestClient and Sniffer

RestClient exposes a setNodes method, which Sniffer can call to update its nodes.
This commit is contained in:
javanna 2016-05-13 11:55:18 +02:00 committed by Luca Cavanna
parent b38ef345e2
commit cdffc3d15b
12 changed files with 591 additions and 873 deletions

View File

@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
* Any change to the state of a connection should be made through the connection pool.
*/
public class Connection {
//TODO make these values configurable through the connection pool?
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final long MAX_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
private final HttpHost host;

View File

@ -1,131 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Pool of connections to the different hosts that belong to an elasticsearch cluster.
* It keeps track of the different hosts to communicate with and allows to retrieve an iterator of connections to be used
* for each request. Marks connections as dead/alive when needed.
* Provides an iterator of connections to be used at each {@link #nextConnection()} call.
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
* handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed).
*/
public abstract class ConnectionPool implements Closeable {
private static final Log logger = LogFactory.getLog(ConnectionPool.class);
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
/**
* Allows to retrieve the concrete list of connections. Not defined directly as a member
* of this class as subclasses may need to handle concurrency if the list can change, for
* instance defining the field as volatile. On the other hand static implementations
* can just make the list final instead.
*/
protected abstract List<Connection> getConnections();
/**
* Returns an iterator of connections that should be used for a request call.
* Ideally, the first connection is retrieved from the iterator and used successfully for the request.
* Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried.
* The maximum total of attempts is equal to the number of connections that are available in the iterator.
* The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case.
* In case there are no alive connections available, or dead ones that should be retried, one dead connection
* gets resurrected and returned.
*/
public final Iterator<Connection> nextConnection() {
List<Connection> connections = getConnections();
if (connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool");
}
List<Connection> rotatedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement());
Iterator<Connection> connectionIterator = rotatedConnections.iterator();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (connection.isAlive() == false && connection.shouldBeRetried() == false) {
connectionIterator.remove();
}
}
if (rotatedConnections.isEmpty()) {
List<Connection> sortedConnections = new ArrayList<>(connections);
Collections.sort(sortedConnections, new Comparator<Connection>() {
@Override
public int compare(Connection o1, Connection o2) {
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
}
});
Connection connection = sortedConnections.get(0);
connection.markResurrected();
logger.trace("marked connection resurrected for " + connection.getHost());
return Collections.singleton(connection).iterator();
}
return rotatedConnections.iterator();
}
/**
* Helper method to be used by subclasses when needing to create a new list
* of connections given their corresponding hosts
*/
protected final List<Connection> createConnections(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new Connection(host));
}
return Collections.unmodifiableList(connections);
}
/**
* Called after each successful request call.
* Receives as an argument the connection that was used for the successful request.
*/
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
}
/**
* Called after each failed attempt.
* Receives as an argument the connection that was used for the failed attempt.
*/
public void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
}
}

View File

@ -39,38 +39,51 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class RestClient implements Closeable {
private static final Log logger = LogFactory.getLog(RestClient.class);
private final CloseableHttpClient client;
private final ConnectionPool connectionPool;
private final long maxRetryTimeout;
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
private volatile List<Connection> connections;
private volatile FailureListener failureListener = new FailureListener();
private RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) {
private RestClient(CloseableHttpClient client, long maxRetryTimeout, HttpHost... hosts) {
this.client = client;
this.connectionPool = connectionPool;
this.maxRetryTimeout = maxRetryTimeout;
setNodes(hosts);
}
public synchronized void setNodes(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>(hosts.length);
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new Connection(host));
}
this.connections = Collections.unmodifiableList(connections);
}
public ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity)
throws IOException {
URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
return performRequest(request, connectionPool.nextConnection());
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
//we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt
long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98);
IOException lastSeenException = null;
long startTime = System.nanoTime();
Iterator<Connection> connectionIterator = nextConnection();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
@ -83,6 +96,7 @@ public final class RestClient implements Closeable {
retryTimeoutException.addSuppressed(lastSeenException);
throw retryTimeoutException;
}
request.reset();
}
CloseableHttpResponse response;
@ -90,17 +104,15 @@ public final class RestClient implements Closeable {
response = client.execute(connection.getHost(), request);
} catch(IOException e) {
RequestLogger.log(logger, "request failed", request, connection.getHost(), e);
connectionPool.onFailure(connection);
onFailure(connection);
lastSeenException = addSuppressedException(lastSeenException, e);
continue;
} finally {
request.reset();
}
int statusCode = response.getStatusLine().getStatusCode();
//TODO make ignore status code configurable. rest-spec and tests support that parameter (ignore_missing)
if (statusCode < 300 || (request.getMethod().equals(HttpHead.METHOD_NAME) && statusCode == 404) ) {
RequestLogger.log(logger, "request succeeded", request, connection.getHost(), response);
connectionPool.onSuccess(connection);
onSuccess(connection);
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
} else {
RequestLogger.log(logger, "request failed", request, connection.getHost(), response);
@ -113,10 +125,10 @@ public final class RestClient implements Closeable {
lastSeenException = addSuppressedException(lastSeenException, elasticsearchResponseException);
//clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places
if (statusCode == 502 || statusCode == 503 || statusCode == 504) {
connectionPool.onFailure(connection);
onFailure(connection);
} else {
//don't retry and call onSuccess as the error should be a request problem, the node is alive
connectionPool.onSuccess(connection);
onSuccess(connection);
break;
}
}
@ -125,6 +137,74 @@ public final class RestClient implements Closeable {
throw lastSeenException;
}
/**
* Returns an iterator of connections that should be used for a request call.
* Ideally, the first connection is retrieved from the iterator and used successfully for the request.
* Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried.
* The maximum total of attempts is equal to the number of connections that are available in the iterator.
* The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case.
* In case there are no alive connections available, or dead ones that should be retried, one dead connection
* gets resurrected and returned.
*/
private Iterator<Connection> nextConnection() {
if (this.connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool");
}
List<Connection> rotatedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement());
Iterator<Connection> connectionIterator = rotatedConnections.iterator();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (connection.isAlive() == false && connection.shouldBeRetried() == false) {
connectionIterator.remove();
}
}
if (rotatedConnections.isEmpty()) {
List<Connection> sortedConnections = new ArrayList<>(connections);
Collections.sort(sortedConnections, new Comparator<Connection>() {
@Override
public int compare(Connection o1, Connection o2) {
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
}
});
Connection connection = sortedConnections.get(0);
connection.markResurrected();
logger.trace("marked connection resurrected for " + connection.getHost());
return Collections.singleton(connection).iterator();
}
return rotatedConnections.iterator();
}
/**
* Called after each successful request call.
* Receives as an argument the connection that was used for the successful request.
*/
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
}
/**
* Called after each failed attempt.
* Receives as an argument the connection that was used for the failed attempt.
*/
private void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
failureListener.onFailure(connection);
}
public synchronized void setFailureListener(FailureListener failureListener) {
this.failureListener = failureListener;
}
@Override
public void close() throws IOException {
client.close();
}
private static IOException addSuppressedException(IOException suppressedException, IOException currentException) {
if (suppressedException != null) {
currentException.addSuppressed(suppressedException);
@ -178,12 +258,6 @@ public final class RestClient implements Closeable {
}
}
@Override
public void close() throws IOException {
connectionPool.close();
client.close();
}
/**
* Returns a new {@link Builder} to help with {@link RestClient} creation.
*/
@ -195,9 +269,11 @@ public final class RestClient implements Closeable {
* Rest client builder. Helps creating a new {@link RestClient}.
*/
public static final class Builder {
private static final int DEFAULT_MAX_RETRY_TIMEOUT = 10000;
public static final int DEFAULT_CONNECT_TIMEOUT = 500;
public static final int DEFAULT_SOCKET_TIMEOUT = 5000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT = DEFAULT_SOCKET_TIMEOUT;
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 500;
private ConnectionPool connectionPool;
private CloseableHttpClient httpClient;
private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT;
private HttpHost[] hosts;
@ -206,17 +282,9 @@ public final class RestClient implements Closeable {
}
/**
* Sets the connection pool. {@link StaticConnectionPool} will be used if not specified.
* @see ConnectionPool
*/
public Builder setConnectionPool(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
return this;
}
/**
* Sets the http client. A new default one will be created if not specified, by calling {@link #createDefaultHttpClient()}.
*
* @see CloseableHttpClient
*/
public Builder setHttpClient(CloseableHttpClient httpClient) {
@ -227,6 +295,7 @@ public final class RestClient implements Closeable {
/**
* Sets the maximum timeout to honour in case of multiple retries of the same request.
* {@link #DEFAULT_MAX_RETRY_TIMEOUT} if not specified.
*
* @throws IllegalArgumentException if maxRetryTimeout is not greater than 0
*/
public Builder setMaxRetryTimeout(int maxRetryTimeout) {
@ -256,10 +325,7 @@ public final class RestClient implements Closeable {
if (httpClient == null) {
httpClient = createDefaultHttpClient();
}
if (connectionPool == null) {
connectionPool = new StaticConnectionPool(hosts);
}
return new RestClient(httpClient, connectionPool, maxRetryTimeout);
return new RestClient(httpClient, maxRetryTimeout, hosts);
}
/**
@ -274,10 +340,21 @@ public final class RestClient implements Closeable {
connectionManager.setMaxTotal(30);
//default timeouts are all infinite
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(500).setSocketTimeout(10000)
.setConnectionRequestTimeout(500).build();
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(DEFAULT_CONNECT_TIMEOUT)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT)
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT).build();
return HttpClientBuilder.create().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig).build();
}
}
/**
* Listener that allows to be notified whenever a failure happens. Useful when sniffing is enabled, so that we can sniff on failure.
* The default implementation is a no-op.
*/
public static class FailureListener {
public void onFailure(Connection connection) throws IOException {
}
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.io.IOException;
import java.util.List;
/**
* Static implementation of {@link ConnectionPool}. Its underlying list of connections is immutable.
*/
public class StaticConnectionPool extends ConnectionPool {
private final List<Connection> connections;
public StaticConnectionPool(HttpHost... hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("no hosts provided");
}
this.connections = createConnections(hosts);
}
@Override
protected List<Connection> getConnections() {
return connections;
}
@Override
public void close() throws IOException {
//no-op nothing to close
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.sniff;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.ElasticsearchResponse;
import org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Class responsible for sniffing the http hosts from elasticsearch through the nodes info api and returning them back
*/
public class HostsSniffer {
private static final Log logger = LogFactory.getLog(HostsSniffer.class);
private final RestClient restClient;
private final Map<String, Object> sniffRequestParams;
private final String scheme;
private final JsonFactory jsonFactory;
public HostsSniffer(RestClient restClient, int sniffRequestTimeout, String scheme) {
this.restClient = restClient;
this.sniffRequestParams = Collections.<String, Object>singletonMap("timeout", sniffRequestTimeout + "ms");
this.scheme = scheme;
this.jsonFactory = new JsonFactory();
}
/**
* Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts
*/
public List<HttpHost> sniffHosts() throws IOException {
try (ElasticsearchResponse response = restClient.performRequest("get", "/_nodes/http", sniffRequestParams, null)) {
return readHosts(response.getEntity());
}
}
private List<HttpHost> readHosts(HttpEntity entity) throws IOException {
try (InputStream inputStream = entity.getContent()) {
JsonParser parser = jsonFactory.createParser(inputStream);
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("expected data to start with an object");
}
List<HttpHost> hosts = new ArrayList<>();
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("nodes".equals(parser.getCurrentName())) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
JsonToken token = parser.nextToken();
assert token == JsonToken.START_OBJECT;
String nodeId = parser.getCurrentName();
HttpHost sniffedHost = readHost(nodeId, parser, this.scheme);
if (sniffedHost != null) {
logger.trace("adding node [" + nodeId + "]");
hosts.add(sniffedHost);
}
}
} else {
parser.skipChildren();
}
}
}
return hosts;
}
}
private static HttpHost readHost(String nodeId, JsonParser parser, String scheme) throws IOException {
HttpHost httpHost = null;
String fieldName = null;
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
fieldName = parser.getCurrentName();
} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("http".equals(fieldName)) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {
URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),
boundAddressAsURI.getScheme());
} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
parser.skipChildren();
}
}
} else {
parser.skipChildren();
}
}
}
//http section is not present if http is not enabled on the node, ignore such nodes
if (httpHost == null) {
logger.debug("skipping node [" + nodeId + "] with http disabled");
return null;
}
return httpHost;
}
}

View File

@ -19,130 +19,228 @@
package org.elasticsearch.client.sniff;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.ElasticsearchResponseException;
import org.elasticsearch.client.RequestLogger;
import org.elasticsearch.client.Connection;
import org.elasticsearch.client.RestClient;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Calls nodes info api and returns a list of http hosts extracted from it.
*/
//TODO This could potentially be using _cat/nodes which wouldn't require jackson as a dependency, but we'd have bw comp problems with 2.x
final class Sniffer {
public final class Sniffer extends RestClient.FailureListener implements Closeable {
private static final Log logger = LogFactory.getLog(Sniffer.class);
private final CloseableHttpClient client;
private final RequestConfig sniffRequestConfig;
private final int sniffRequestTimeout;
private final String scheme;
private final JsonFactory jsonFactory;
private final boolean sniffOnFailure;
private final Task task;
Sniffer(CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, String scheme) {
this.client = client;
this.sniffRequestConfig = sniffRequestConfig;
this.sniffRequestTimeout = sniffRequestTimeout;
this.scheme = scheme;
this.jsonFactory = new JsonFactory();
public Sniffer(RestClient restClient, int sniffRequestTimeout, String scheme, int sniffInterval,
boolean sniffOnFailure, int sniffAfterFailureDelay) {
HostsSniffer hostsSniffer = new HostsSniffer(restClient, sniffRequestTimeout, scheme);
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay);
this.sniffOnFailure = sniffOnFailure;
restClient.setFailureListener(this);
}
List<HttpHost> sniffNodes(HttpHost host) throws IOException {
HttpGet httpGet = new HttpGet("/_nodes/http?timeout=" + sniffRequestTimeout + "ms");
httpGet.setConfig(sniffRequestConfig);
try (CloseableHttpResponse response = client.execute(host, httpGet)) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() >= 300) {
RequestLogger.log(logger, "sniff failed", httpGet, host, response);
String responseBody = null;
if (response.getEntity() != null) {
responseBody = EntityUtils.toString(response.getEntity());
}
throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, response.getStatusLine(), responseBody);
} else {
List<HttpHost> nodes = readHosts(response.getEntity());
RequestLogger.log(logger, "sniff succeeded", httpGet, host, response);
return nodes;
}
} catch(IOException e) {
RequestLogger.log(logger, "sniff failed", httpGet, host, e);
throw e;
@Override
public void onFailure(Connection connection) throws IOException {
if (sniffOnFailure) {
//re-sniff immediately but take out the node that failed
task.sniffOnFailure(connection.getHost());
}
}
private List<HttpHost> readHosts(HttpEntity entity) throws IOException {
try (InputStream inputStream = entity.getContent()) {
JsonParser parser = jsonFactory.createParser(inputStream);
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("expected data to start with an object");
}
List<HttpHost> hosts = new ArrayList<>();
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("nodes".equals(parser.getCurrentName())) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
JsonToken token = parser.nextToken();
assert token == JsonToken.START_OBJECT;
String nodeId = parser.getCurrentName();
HttpHost sniffedHost = readNode(nodeId, parser, this.scheme);
if (sniffedHost != null) {
logger.trace("adding node [" + nodeId + "]");
hosts.add(sniffedHost);
}
}
} else {
parser.skipChildren();
@Override
public void close() throws IOException {
task.shutdown();
}
private static class Task implements Runnable {
private final HostsSniffer hostsSniffer;
private final RestClient restClient;
private final int sniffInterval;
private final int sniffAfterFailureDelay;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile int nextSniffDelay;
private volatile ScheduledFuture<?> scheduledFuture;
private Task(HostsSniffer hostsSniffer, RestClient restClient, int sniffInterval, int sniffAfterFailureDelay) {
this.hostsSniffer = hostsSniffer;
this.restClient = restClient;
this.sniffInterval = sniffInterval;
this.sniffAfterFailureDelay = sniffAfterFailureDelay;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledFuture = this.scheduledExecutorService.schedule(this, 0, TimeUnit.MILLISECONDS);
this.nextSniffDelay = sniffInterval;
}
@Override
public void run() {
sniff(null);
}
void sniffOnFailure(HttpHost failedHost) {
this.nextSniffDelay = sniffAfterFailureDelay;
sniff(failedHost);
}
void sniff(HttpHost excludeHost) {
if (running.compareAndSet(false, true)) {
try {
List<HttpHost> sniffedNodes = hostsSniffer.sniffHosts();
if (excludeHost != null) {
sniffedNodes.remove(excludeHost);
}
logger.debug("sniffed nodes: " + sniffedNodes);
this.restClient.setNodes(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()]));
} catch (Throwable t) {
logger.error("error while sniffing nodes", t);
} finally {
try {
//regardless of whether and when the next sniff is scheduled, cancel it and schedule a new one with updated delay
this.scheduledFuture.cancel(false);
logger.debug("scheduling next sniff in " + nextSniffDelay + " ms");
this.scheduledFuture = this.scheduledExecutorService.schedule(this, nextSniffDelay, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("error while scheduling next sniffer task", t);
} finally {
this.nextSniffDelay = sniffInterval;
running.set(false);
}
}
}
return hosts;
}
void shutdown() {
scheduledExecutorService.shutdown();
try {
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduledExecutorService.shutdownNow();
}
}
private static HttpHost readNode(String nodeId, JsonParser parser, String scheme) throws IOException {
HttpHost httpHost = null;
String fieldName = null;
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
fieldName = parser.getCurrentName();
} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("http".equals(fieldName)) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {
URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),
boundAddressAsURI.getScheme());
} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
parser.skipChildren();
}
}
} else {
parser.skipChildren();
}
/**
* Returns a new {@link Builder} to help with {@link Sniffer} creation.
*/
public static Builder builder() {
return new Builder();
}
/**
* Sniffer builder. Helps creating a new {@link Sniffer}.
*/
public static final class Builder {
public static final int DEFAULT_SNIFF_INTERVAL = 60000 * 5; //5 minutes
public static final int DEFAULT_SNIFF_AFTER_FAILURE_DELAY = 60000; //1 minute
public static final int DEFAULT_SNIFF_REQUEST_TIMEOUT = 1000; //1 second
private int sniffRequestTimeout = DEFAULT_SNIFF_REQUEST_TIMEOUT;
private int sniffInterval = DEFAULT_SNIFF_INTERVAL;
private boolean sniffOnFailure = true;
private int sniffAfterFailureDelay = DEFAULT_SNIFF_AFTER_FAILURE_DELAY;
private String scheme = "http";
private RestClient restClient;
private Builder() {
}
/**
* Sets the interval between consecutive ordinary sniff executions. Will be honoured when sniffOnFailure is disabled or
* when there are no failures between consecutive sniff executions.
* @throws IllegalArgumentException if sniffInterval is not greater than 0
*/
public Builder setSniffInterval(int sniffInterval) {
if (sniffInterval <= 0) {
throw new IllegalArgumentException("sniffInterval must be greater than 0");
}
this.sniffInterval = sniffInterval;
return this;
}
//http section is not present if http is not enabled on the node, ignore such nodes
if (httpHost == null) {
logger.debug("skipping node [" + nodeId + "] with http disabled");
return null;
/**
* Enables/disables sniffing on failure. If enabled, at each failure nodes will be reloaded, and a new sniff execution will
* be scheduled after a shorter time than usual (sniffAfterFailureDelay).
*/
public Builder setSniffOnFailure(boolean sniffOnFailure) {
this.sniffOnFailure = sniffOnFailure;
return this;
}
/**
* Sets the delay of a sniff execution scheduled after a failure.
*/
public Builder setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
if (sniffAfterFailureDelay <= 0) {
throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0");
}
this.sniffAfterFailureDelay = sniffAfterFailureDelay;
return this;
}
/**
* Sets the http client. Mandatory argument. Best practice is to use the same client used
* within {@link org.elasticsearch.client.RestClient} which can be created manually or
* through {@link RestClient.Builder#createDefaultHttpClient()}.
* @see CloseableHttpClient
*/
public Builder setRestClient(RestClient restClient) {
this.restClient = restClient;
return this;
}
/**
* Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch.
* Allows to halt the request without any failure, as only the nodes that have responded
* within this timeout will be returned.
*/
public Builder setSniffRequestTimeout(int sniffRequestTimeout) {
if (sniffRequestTimeout <=0) {
throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0");
}
this.sniffRequestTimeout = sniffRequestTimeout;
return this;
}
/**
* Sets the scheme to be used for sniffed nodes. This information is not returned by elasticsearch,
* default is http but should be customized if https is needed/enabled.
*/
public Builder setScheme(String scheme) {
Objects.requireNonNull(scheme, "scheme cannot be null");
if (scheme.equals("http") == false && scheme.equals("https") == false) {
throw new IllegalArgumentException("scheme must be either http or https");
}
this.scheme = scheme;
return this;
}
/**
* Creates the {@link Sniffer} based on the provided configuration.
*/
public Sniffer build() {
Objects.requireNonNull(restClient, "restClient cannot be null");
return new Sniffer(restClient, sniffRequestTimeout, scheme, sniffInterval, sniffOnFailure, sniffAfterFailureDelay);
}
return httpHost;
}
}

View File

@ -1,299 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.sniff;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.elasticsearch.client.Connection;
import org.elasticsearch.client.ConnectionPool;
import org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Connection pool implementation that sniffs nodes from elasticsearch at regular intervals.
* Can optionally sniff nodes on each failure as well.
*/
public class SniffingConnectionPool extends ConnectionPool {
private static final Log logger = LogFactory.getLog(SniffingConnectionPool.class);
private final boolean sniffOnFailure;
private final Sniffer sniffer;
private volatile List<Connection> connections;
private final SnifferTask snifferTask;
private SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay, CloseableHttpClient client,
RequestConfig sniffRequestConfig, int sniffRequestTimeout, String scheme, HttpHost... hosts) {
this.sniffOnFailure = sniffOnFailure;
this.sniffer = new Sniffer(client, sniffRequestConfig, sniffRequestTimeout, scheme);
this.connections = createConnections(hosts);
this.snifferTask = new SnifferTask(sniffInterval, sniffAfterFailureDelay);
}
@Override
protected List<Connection> getConnections() {
return this.connections;
}
@Override
public void onFailure(Connection connection) throws IOException {
super.onFailure(connection);
if (sniffOnFailure) {
//re-sniff immediately but take out the node that failed
snifferTask.sniffOnFailure(connection.getHost());
}
}
@Override
public void close() throws IOException {
snifferTask.shutdown();
}
private class SnifferTask implements Runnable {
private final int sniffInterval;
private final int sniffAfterFailureDelay;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile boolean failure = false;
private volatile ScheduledFuture<?> scheduledFuture;
private SnifferTask(int sniffInterval, int sniffAfterFailureDelay) {
this.sniffInterval = sniffInterval;
this.sniffAfterFailureDelay = sniffAfterFailureDelay;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledFuture = this.scheduledExecutorService.schedule(this, 0, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
sniff(null);
}
void sniffOnFailure(HttpHost failedHost) {
//sync sniff straightaway on failure
failure = true;
sniff(failedHost);
}
void sniff(HttpHost excludeHost) {
if (running.compareAndSet(false, true)) {
try {
sniff(nextConnection(), excludeHost);
} catch (Throwable t) {
logger.error("error while sniffing nodes", t);
} finally {
try {
//regardless of whether and when the next sniff is scheduled, cancel it and schedule a new one with updated delay
this.scheduledFuture.cancel(false);
if (this.failure) {
this.scheduledFuture = this.scheduledExecutorService.schedule(this,
sniffAfterFailureDelay, TimeUnit.MILLISECONDS);
this.failure = false;
} else {
this.scheduledFuture = this.scheduledExecutorService.schedule(this, sniffInterval, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
logger.error("error while scheduling next sniffer task", t);
} finally {
running.set(false);
}
}
}
}
void sniff(Iterator<Connection> connectionIterator, HttpHost excludeHost) throws IOException {
IOException lastSeenException = null;
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
try {
List<HttpHost> sniffedNodes = sniffer.sniffNodes(connection.getHost());
if (excludeHost != null) {
sniffedNodes.remove(excludeHost);
}
connections = createConnections(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()]));
onSuccess(connection);
return;
} catch (IOException e) {
//here we have control over the request, if it fails something is really wrong, always call onFailure
onFailure(connection);
if (lastSeenException != null) {
e.addSuppressed(lastSeenException);
}
lastSeenException = e;
}
}
logger.warn("failed to sniff nodes", lastSeenException);
}
void shutdown() {
scheduledExecutorService.shutdown();
try {
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduledExecutorService.shutdownNow();
}
}
/**
* Returns a new {@link Builder} to help with {@link SniffingConnectionPool} creation.
*/
public static Builder builder() {
return new Builder();
}
/**
* Sniffing connection pool builder. Helps creating a new {@link SniffingConnectionPool}.
*/
public static final class Builder {
private int sniffInterval = 5 * 1000 * 60;
private boolean sniffOnFailure = true;
private int sniffAfterFailureDelay = 60000;
private CloseableHttpClient httpClient;
private RequestConfig sniffRequestConfig;
private int sniffRequestTimeout = 1000;
private String scheme = "http";
private HttpHost[] hosts;
private Builder() {
}
/**
* Sets the interval between consecutive ordinary sniff executions. Will be honoured when sniffOnFailure is disabled or
* when there are no failures between consecutive sniff executions.
* @throws IllegalArgumentException if sniffInterval is not greater than 0
*/
public Builder setSniffInterval(int sniffInterval) {
if (sniffInterval <= 0) {
throw new IllegalArgumentException("sniffInterval must be greater than 0");
}
this.sniffInterval = sniffInterval;
return this;
}
/**
* Enables/disables sniffing on failure. If enabled, at each failure nodes will be reloaded, and a new sniff execution will
* be scheduled after a shorter time than usual (sniffAfterFailureDelay).
*/
public Builder setSniffOnFailure(boolean sniffOnFailure) {
this.sniffOnFailure = sniffOnFailure;
return this;
}
/**
* Sets the delay of a sniff execution scheduled after a failure.
*/
public Builder setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
if (sniffAfterFailureDelay <= 0) {
throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0");
}
this.sniffAfterFailureDelay = sniffAfterFailureDelay;
return this;
}
/**
* Sets the http client. Mandatory argument. Best practice is to use the same client used
* within {@link org.elasticsearch.client.RestClient} which can be created manually or
* through {@link RestClient.Builder#createDefaultHttpClient()}.
* @see CloseableHttpClient
*/
public Builder setHttpClient(CloseableHttpClient httpClient) {
this.httpClient = httpClient;
return this;
}
/**
* Sets the configuration to be used for each sniff request. Useful as sniff can have
* different timeouts compared to ordinary requests.
* @see RequestConfig
*/
public Builder setSniffRequestConfig(RequestConfig sniffRequestConfig) {
this.sniffRequestConfig = sniffRequestConfig;
return this;
}
/**
* Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch.
* Allows to halt the request without any failure, as only the nodes that have responded
* within this timeout will be returned.
*/
public Builder setSniffRequestTimeout(int sniffRequestTimeout) {
if (sniffRequestTimeout <=0) {
throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0");
}
this.sniffRequestTimeout = sniffRequestTimeout;
return this;
}
/**
* Sets the scheme to be used for sniffed nodes. This information is not returned by elasticsearch,
* default is http but should be customized if https is needed/enabled.
*/
public Builder setScheme(String scheme) {
Objects.requireNonNull(scheme, "scheme cannot be null");
if (scheme.equals("http") == false && scheme.equals("https") == false) {
throw new IllegalArgumentException("scheme must be either http or https");
}
this.scheme = scheme;
return this;
}
/**
* Sets the hosts that the client will send requests to.
*/
public Builder setHosts(HttpHost... hosts) {
this.hosts = hosts;
return this;
}
/**
* Creates the {@link SniffingConnectionPool} based on the provided configuration.
*/
public SniffingConnectionPool build() {
Objects.requireNonNull(httpClient, "httpClient cannot be null");
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("no hosts provided");
}
if (sniffRequestConfig == null) {
sniffRequestConfig = RequestConfig.custom().setConnectTimeout(500).setSocketTimeout(1000)
.setConnectionRequestTimeout(500).build();
}
return new SniffingConnectionPool(sniffInterval, sniffOnFailure, sniffAfterFailureDelay, httpClient, sniffRequestConfig,
sniffRequestTimeout, scheme, hosts);
}
}
}

View File

@ -25,8 +25,6 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.lucene.util.LuceneTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.logging.LogManager;
public class RestClientBuilderTests extends LuceneTestCase {
@ -57,38 +55,23 @@ public class RestClientBuilderTests extends LuceneTestCase {
assertEquals(e.getMessage(), "no hosts provided");
}
RestClient.Builder builder = RestClient.builder();
if (random().nextBoolean()) {
ConnectionPool connectionPool = new ConnectionPool() {
@Override
protected List<Connection> getConnections() {
return Collections.emptyList();
}
@Override
public void onSuccess(Connection connection) {
}
@Override
public void onFailure(Connection connection) throws IOException {
}
@Override
public void close() throws IOException {
}
};
builder.setConnectionPool(connectionPool);
} else {
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i);
}
builder.setHosts(hosts);
try {
RestClient.builder();
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
RestClient.Builder builder = RestClient.builder();
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i);
}
builder.setHosts(hosts);
//TODO test one host is null among others
if (random().nextBoolean()) {
builder.setHttpClient(HttpClientBuilder.create().build());
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import org.apache.http.HttpHost;
import org.apache.lucene.util.LuceneTestCase;
import java.util.logging.LogManager;
public class StaticConnectionPoolTests extends LuceneTestCase {
static {
LogManager.getLogManager().reset();
}
public void testConstructor() {
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200);
}
try {
new StaticConnectionPool((HttpHost) null);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "host cannot be null");
}
try {
new StaticConnectionPool((HttpHost[])null);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
try {
new StaticConnectionPool();
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
StaticConnectionPool staticConnectionPool = new StaticConnectionPool(hosts);
assertNotNull(staticConnectionPool);
}
}

View File

@ -29,11 +29,9 @@ import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.ElasticsearchResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.After;
import org.junit.Before;
@ -57,7 +55,7 @@ import java.util.logging.LogManager;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
public class SnifferTests extends LuceneTestCase {
public class HostsSnifferTests extends LuceneTestCase {
static {
//prevent MockWebServer from logging to stdout and stderr
@ -88,34 +86,36 @@ public class SnifferTests extends LuceneTestCase {
}
public void testSniffNodes() throws IOException, URISyntaxException {
CloseableHttpClient client = HttpClientBuilder.create().build();
Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme);
HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort());
try {
List<HttpHost> sniffedHosts = sniffer.sniffNodes(httpHost);
if (sniffResponse.isFailure) {
fail("sniffNodes should have failed");
}
assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size()));
Iterator<HttpHost> responseHostsIterator = sniffResponse.hosts.iterator();
for (HttpHost sniffedHost : sniffedHosts) {
assertEquals(sniffedHost, responseHostsIterator.next());
}
} catch(ElasticsearchResponseException e) {
if (sniffResponse.isFailure) {
assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() +
"/_nodes/http?timeout=" + sniffRequestTimeout));
assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode)));
assertThat(e.getHost(), equalTo(httpHost));
assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode));
assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1"));
} else {
fail("sniffNodes should have succeeded: " + e.getStatusLine());
try (RestClient restClient = RestClient.builder().setHosts(httpHost).build()) {
HostsSniffer sniffer = new HostsSniffer(restClient, sniffRequestTimeout, scheme);
try {
List<HttpHost> sniffedHosts = sniffer.sniffHosts();
if (sniffResponse.isFailure) {
fail("sniffNodes should have failed");
}
assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size()));
Iterator<HttpHost> responseHostsIterator = sniffResponse.hosts.iterator();
for (HttpHost sniffedHost : sniffedHosts) {
assertEquals(sniffedHost, responseHostsIterator.next());
}
} catch(ElasticsearchResponseException e) {
if (sniffResponse.isFailure) {
assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() +
"/_nodes/http?timeout=" + sniffRequestTimeout));
assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode)));
assertThat(e.getHost(), equalTo(httpHost));
assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode));
assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1"));
} else {
fail("sniffNodes should have succeeded: " + e.getStatusLine());
}
}
}
}
private static MockWebServer buildMockWebServer(final SniffResponse sniffResponse, final int sniffTimeout) throws UnsupportedEncodingException {
private static MockWebServer buildMockWebServer(final SniffResponse sniffResponse, final int sniffTimeout)
throws UnsupportedEncodingException {
MockWebServer server = new MockWebServer();
final Dispatcher dispatcher = new Dispatcher() {
@Override

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.sniff;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.RestClient;
import java.util.Arrays;
import java.util.logging.LogManager;
public class SnifferBuilderTests extends LuceneTestCase {
static {
LogManager.getLogManager().reset();
}
public void testBuild() throws Exception {
try {
Sniffer.builder().setScheme(null);
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "scheme cannot be null");
}
try {
Sniffer.builder().setScheme("whatever");
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "scheme must be either http or https");
}
try {
Sniffer.builder().setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffInterval must be greater than 0");
}
try {
Sniffer.builder().setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
}
try {
Sniffer.builder().setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0");
}
try {
Sniffer.builder().build();
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "restClient cannot be null");
}
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i);
}
try (RestClient client = RestClient.builder().setHosts(hosts).build()) {
try (Sniffer sniffer = Sniffer.builder().setRestClient(client).build()) {
assertNotNull(sniffer);
}
}
try (RestClient client = RestClient.builder().setHosts(hosts).build()) {
Sniffer.Builder builder = Sniffer.builder().setRestClient(client);
if (random().nextBoolean()) {
builder.setScheme(RandomPicks.randomFrom(random(), Arrays.asList("http", "https")));
}
if (random().nextBoolean()) {
builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffOnFailure(random().nextBoolean());
}
try (Sniffer connectionPool = builder.build()) {
assertNotNull(connectionPool);
}
}
}
}

View File

@ -1,138 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.sniff;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.lucene.util.LuceneTestCase;
import java.util.Arrays;
import java.util.logging.LogManager;
public class SniffingConnectionPoolBuilderTests extends LuceneTestCase {
static {
LogManager.getLogManager().reset();
}
public void testBuild() throws Exception {
try {
SniffingConnectionPool.builder().setScheme(null);
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "scheme cannot be null");
}
try {
SniffingConnectionPool.builder().setScheme("whatever");
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "scheme must be either http or https");
}
try {
SniffingConnectionPool.builder().setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffInterval must be greater than 0");
}
try {
SniffingConnectionPool.builder().setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
}
try {
SniffingConnectionPool.builder().setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0");
}
try {
SniffingConnectionPool.builder().build();
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "httpClient cannot be null");
}
try {
SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).build();
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
try {
SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).setHosts((HttpHost[])null).build();
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
try {
SniffingConnectionPool.builder().setHttpClient(HttpClientBuilder.create().build()).setHosts().build();
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no hosts provided");
}
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i);
}
try (SniffingConnectionPool connectionPool = SniffingConnectionPool.builder()
.setHttpClient(HttpClientBuilder.create().build()).setHosts(hosts).build()) {
assertNotNull(connectionPool);
}
SniffingConnectionPool.Builder builder = SniffingConnectionPool.builder()
.setHttpClient(HttpClientBuilder.create().build()).setHosts(hosts);
if (random().nextBoolean()) {
builder.setScheme(RandomPicks.randomFrom(random(), Arrays.asList("http", "https")));
}
if (random().nextBoolean()) {
builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) {
builder.setSniffOnFailure(random().nextBoolean());
}
if (random().nextBoolean()) {
builder.setSniffRequestConfig(RequestConfig.DEFAULT);
}
try (SniffingConnectionPool connectionPool = builder.build()) {
assertNotNull(connectionPool);
}
}
}