From b38ef345e2e8eac35ae2727008f15ed26210ba31 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 10 May 2016 10:50:53 +0200 Subject: [PATCH] remove streams leftover Given that we don't use streams anymore, we can check straightaway if the connection iterator is empty before returning it and resurrect a connection when needed directly in the connection pool, no lastResortConnection method required. --- .../elasticsearch/client/ConnectionPool.java | 59 ++++++++----------- .../org/elasticsearch/client/RestClient.java | 9 +-- .../client/sniff/SniffingConnectionPool.java | 10 +--- 3 files changed, 28 insertions(+), 50 deletions(-) diff --git a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java b/client/src/main/java/org/elasticsearch/client/ConnectionPool.java index 89b714f2eee..55e9554684e 100644 --- a/client/src/main/java/org/elasticsearch/client/ConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/ConnectionPool.java @@ -35,11 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Pool of connections to the different hosts that belong to an elasticsearch cluster. - * It keeps track of the different hosts to communicate with and allows to retrieve a stream of connections to be used + * It keeps track of the different hosts to communicate with and allows to retrieve an iterator of connections to be used * for each request. Marks connections as dead/alive when needed. - * Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call. - * In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()} - * and resurrected so that a last resort request attempt can be performed. + * Provides an iterator of connections to be used at each {@link #nextConnection()} call. * The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive. * The {@link #onFailure(Connection)} method marks the connection provided as an argument dead. * This base implementation doesn't define the list implementation that stores connections, so that concurrency can be @@ -60,12 +58,13 @@ public abstract class ConnectionPool implements Closeable { protected abstract List getConnections(); /** - * Returns a stream of connections that should be used for a request call. - * Ideally, the first connection is retrieved from the stream and used successfully for the request. - * Otherwise, after each failure the next connection should be retrieved from the stream so that the request can be retried. - * The maximum total of attempts is equal to the number of connections that are available in the stream. - * It may happen that the stream is empty, in which case it means that there aren't healthy connections to use. - * Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it. + * Returns an iterator of connections that should be used for a request call. + * Ideally, the first connection is retrieved from the iterator and used successfully for the request. + * Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried. + * The maximum total of attempts is equal to the number of connections that are available in the iterator. + * The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case. + * In case there are no alive connections available, or dead ones that should be retried, one dead connection + * gets resurrected and returned. */ public final Iterator nextConnection() { List connections = getConnections(); @@ -73,17 +72,30 @@ public abstract class ConnectionPool implements Closeable { throw new IllegalStateException("no connections available in the connection pool"); } - List sortedConnections = new ArrayList<>(connections); + List rotatedConnections = new ArrayList<>(connections); //TODO is it possible to make this O(1)? (rotate is O(n)) - Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement()); - Iterator connectionIterator = sortedConnections.iterator(); + Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement()); + Iterator connectionIterator = rotatedConnections.iterator(); while (connectionIterator.hasNext()) { Connection connection = connectionIterator.next(); if (connection.isAlive() == false && connection.shouldBeRetried() == false) { connectionIterator.remove(); } } - return connectionIterator; + if (rotatedConnections.isEmpty()) { + List sortedConnections = new ArrayList<>(connections); + Collections.sort(sortedConnections, new Comparator() { + @Override + public int compare(Connection o1, Connection o2) { + return Long.compare(o1.getDeadUntil(), o2.getDeadUntil()); + } + }); + Connection connection = sortedConnections.get(0); + connection.markResurrected(); + logger.trace("marked connection resurrected for " + connection.getHost()); + return Collections.singleton(connection).iterator(); + } + return rotatedConnections.iterator(); } /** @@ -99,25 +111,6 @@ public abstract class ConnectionPool implements Closeable { return Collections.unmodifiableList(connections); } - /** - * Returns a connection that is not necessarily healthy, but can be used for a request attempt. To be called as last resort - * only in case {@link #nextConnection()} returns an empty stream. - */ - public final Connection lastResortConnection() { - List connections = getConnections(); - if (connections.isEmpty()) { - throw new IllegalStateException("no connections available in the connection pool"); - } - List sortedConnections = new ArrayList<>(connections); - Collections.sort(sortedConnections, new Comparator() { - @Override - public int compare(Connection o1, Connection o2) { - return Long.compare(o1.getDeadUntil(), o2.getDeadUntil()); - } - }); - return sortedConnections.get(0); - } - /** * Called after each successful request call. * Receives as an argument the connection that was used for the successful request. diff --git a/client/src/main/java/org/elasticsearch/client/RestClient.java b/client/src/main/java/org/elasticsearch/client/RestClient.java index f13be9e4e49..c4d6ede623e 100644 --- a/client/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/src/main/java/org/elasticsearch/client/RestClient.java @@ -39,7 +39,6 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collections; import java.util.Iterator; import java.util.Locale; import java.util.Map; @@ -63,13 +62,7 @@ public final class RestClient implements Closeable { throws IOException { URI uri = buildUri(endpoint, params); HttpRequestBase request = createHttpRequest(method, uri, entity); - Iterator connectionIterator = connectionPool.nextConnection(); - if (connectionIterator.hasNext() == false) { - Connection connection = connectionPool.lastResortConnection(); - logger.info("no healthy nodes available, trying " + connection.getHost()); - return performRequest(request, Collections.singleton(connection).iterator()); - } - return performRequest(request, connectionIterator); + return performRequest(request, connectionPool.nextConnection()); } private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator connectionIterator) throws IOException { diff --git a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java b/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java index 6bc7ad84e36..6fd98666e8c 100644 --- a/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/sniff/SniffingConnectionPool.java @@ -29,7 +29,6 @@ import org.elasticsearch.client.ConnectionPool; import org.elasticsearch.client.RestClient; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -108,14 +107,7 @@ public class SniffingConnectionPool extends ConnectionPool { void sniff(HttpHost excludeHost) { if (running.compareAndSet(false, true)) { try { - Iterator connectionIterator = nextConnection(); - if (connectionIterator.hasNext()) { - sniff(connectionIterator, excludeHost); - } else { - Connection connection = lastResortConnection(); - logger.info("no healthy nodes available, trying " + connection.getHost()); - sniff(Collections.singleton(connection).iterator(), excludeHost); - } + sniff(nextConnection(), excludeHost); } catch (Throwable t) { logger.error("error while sniffing nodes", t); } finally {