diff --git a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java deleted file mode 100644 index 3fe4ead4bbe..00000000000 --- a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpHost; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; - -/** - * Base static connection pool implementation that marks connections as dead/alive when needed. - * Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call. - * In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()} - * and resurrected so that a last resort request attempt can be performed. - * The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive. - * The {@link #onFailure(Connection)} method marks the connection provided as an argument dead. - * This base implementation doesn't define the list implementation that stores connections, so that concurrency can be - * handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed). - */ -public abstract class AbstractStaticConnectionPool implements ConnectionPool { - - private static final Log logger = LogFactory.getLog(AbstractStaticConnectionPool.class); - - private final AtomicInteger lastConnectionIndex = new AtomicInteger(0); - - /** - * Allows to retrieve the concrete list of connections. Not defined directly as a member - * of this class as subclasses may need to handle concurrency if the list can change, for - * instance defining the field as volatile. On the other hand static implementations - * can just make the list final instead. - */ - protected abstract List getConnections(); - - @Override - public final Stream nextConnection() { - List connections = getConnections(); - if (connections.isEmpty()) { - throw new IllegalStateException("no connections available in the connection pool"); - } - - List sortedConnections = new ArrayList<>(connections); - //TODO is it possible to make this O(1)? (rotate is O(n)) - Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement()); - return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried()); - } - - /** - * Helper method to be used by subclasses when needing to create a new list - * of connections given their corresponding hosts - */ - protected List createConnections(HttpHost... hosts) { - List connections = new ArrayList<>(); - for (HttpHost host : hosts) { - Objects.requireNonNull(host, "host cannot be null"); - connections.add(new Connection(host)); - } - return Collections.unmodifiableList(connections); - } - - @Override - public Connection lastResortConnection() { - Connection Connection = getConnections().stream() - .sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get(); - Connection.markResurrected(); - return Connection; - } - - @Override - public void onSuccess(Connection connection) { - connection.markAlive(); - logger.trace("marked connection alive for " + connection.getHost()); - } - - @Override - public void onFailure(Connection connection) throws IOException { - connection.markDead(); - logger.debug("marked connection dead for " + connection.getHost()); - } -} diff --git a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java b/client/src/main/java/org/elasticsearch/client/ConnectionPool.java index b76376abf10..b0be3a27075 100644 --- a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/ConnectionPool.java @@ -19,16 +19,44 @@ package org.elasticsearch.client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; + import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; /** * Pool of connections to the different hosts that belong to an elasticsearch cluster. * It keeps track of the different hosts to communicate with and allows to retrieve a stream of connections to be used - * for each request. Exposes the needed hooks to be able to eventually mark connections dead or alive. + * for each request. Marks connections as dead/alive when needed. + * Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call. + * In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()} + * and resurrected so that a last resort request attempt can be performed. + * The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive. + * The {@link #onFailure(Connection)} method marks the connection provided as an argument dead. + * This base implementation doesn't define the list implementation that stores connections, so that concurrency can be + * handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed). */ -public interface ConnectionPool extends Closeable { +public abstract class ConnectionPool implements Closeable { + + private static final Log logger = LogFactory.getLog(ConnectionPool.class); + + private final AtomicInteger lastConnectionIndex = new AtomicInteger(0); + + /** + * Allows to retrieve the concrete list of connections. Not defined directly as a member + * of this class as subclasses may need to handle concurrency if the list can change, for + * instance defining the field as volatile. On the other hand static implementations + * can just make the list final instead. + */ + protected abstract List getConnections(); /** * Returns a stream of connections that should be used for a request call. @@ -38,23 +66,57 @@ public interface ConnectionPool extends Closeable { * It may happen that the stream is empty, in which case it means that there aren't healthy connections to use. * Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it. */ - Stream nextConnection(); + public final Stream nextConnection() { + List connections = getConnections(); + if (connections.isEmpty()) { + throw new IllegalStateException("no connections available in the connection pool"); + } + + List sortedConnections = new ArrayList<>(connections); + //TODO is it possible to make this O(1)? (rotate is O(n)) + Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement()); + return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried()); + } + + /** + * Helper method to be used by subclasses when needing to create a new list + * of connections given their corresponding hosts + */ + protected final List createConnections(HttpHost... hosts) { + List connections = new ArrayList<>(); + for (HttpHost host : hosts) { + Objects.requireNonNull(host, "host cannot be null"); + connections.add(new Connection(host)); + } + return Collections.unmodifiableList(connections); + } /** * Returns a connection that is not necessarily healthy, but can be used for a request attempt. To be called as last resort - * only in case {@link #nextConnection()} returns an empty stream + * only in case {@link #nextConnection()} returns an empty stream. */ - Connection lastResortConnection(); + public final Connection lastResortConnection() { + Connection Connection = getConnections().stream() + .sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get(); + Connection.markResurrected(); + return Connection; + } /** * Called after each successful request call. * Receives as an argument the connection that was used for the successful request. */ - void onSuccess(Connection connection); + public void onSuccess(Connection connection) { + connection.markAlive(); + logger.trace("marked connection alive for " + connection.getHost()); + } /** * Called after each failed attempt. * Receives as an argument the connection that was used for the failed attempt. */ - void onFailure(Connection connection) throws IOException; + public void onFailure(Connection connection) throws IOException { + connection.markDead(); + logger.debug("marked connection dead for " + connection.getHost()); + } } diff --git a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java index 289f14ce6ef..4ec0bcb3c39 100644 --- a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java @@ -27,7 +27,7 @@ import java.util.List; /** * Static implementation of {@link ConnectionPool}. Its underlying list of connections is immutable. */ -public class StaticConnectionPool extends AbstractStaticConnectionPool { +public class StaticConnectionPool extends ConnectionPool { private final List connections; diff --git a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java b/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java index 934352deee0..a2973638ad4 100644 --- a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java @@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; -import org.elasticsearch.client.AbstractStaticConnectionPool; +import org.elasticsearch.client.ConnectionPool; import org.elasticsearch.client.Connection; import java.io.IOException; @@ -43,7 +43,7 @@ import java.util.stream.Stream; * Connection pool implementation that sniffs nodes from elasticsearch at regular intervals. * Can optionally sniff nodes on each failure as well. */ -public class SniffingConnectionPool extends AbstractStaticConnectionPool { +public class SniffingConnectionPool extends ConnectionPool { private static final Log logger = LogFactory.getLog(SniffingConnectionPool.class); diff --git a/client/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/src/test/java/org/elasticsearch/client/RestClientTests.java index 6c3850fafcf..0d9930d9c9f 100644 --- a/client/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -25,8 +25,9 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.lucene.util.LuceneTestCase; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.logging.LogManager; -import java.util.stream.Stream; public class RestClientTests extends LuceneTestCase { @@ -38,13 +39,8 @@ public class RestClientTests extends LuceneTestCase { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); ConnectionPool connectionPool = new ConnectionPool() { @Override - public Stream nextConnection() { - return null; - } - - @Override - public Connection lastResortConnection() { - return null; + protected List getConnections() { + return Collections.emptyList(); } @Override