remove streams leftover
Given that we don't use streams anymore, we can check straightaway if the connection iterator is empty before returning it and resurrect a connection when needed directly in the connection pool, no lastResortConnection method required.
This commit is contained in:
parent
599dad560c
commit
b38ef345e2
|
@ -35,11 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pool of connections to the different hosts that belong to an elasticsearch cluster.
|
* Pool of connections to the different hosts that belong to an elasticsearch cluster.
|
||||||
* It keeps track of the different hosts to communicate with and allows to retrieve a stream of connections to be used
|
* It keeps track of the different hosts to communicate with and allows to retrieve an iterator of connections to be used
|
||||||
* for each request. Marks connections as dead/alive when needed.
|
* for each request. Marks connections as dead/alive when needed.
|
||||||
* Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call.
|
* Provides an iterator of connections to be used at each {@link #nextConnection()} call.
|
||||||
* In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()}
|
|
||||||
* and resurrected so that a last resort request attempt can be performed.
|
|
||||||
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
|
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
|
||||||
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
|
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
|
||||||
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
|
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
|
||||||
|
@ -60,12 +58,13 @@ public abstract class ConnectionPool implements Closeable {
|
||||||
protected abstract List<Connection> getConnections();
|
protected abstract List<Connection> getConnections();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a stream of connections that should be used for a request call.
|
* Returns an iterator of connections that should be used for a request call.
|
||||||
* Ideally, the first connection is retrieved from the stream and used successfully for the request.
|
* Ideally, the first connection is retrieved from the iterator and used successfully for the request.
|
||||||
* Otherwise, after each failure the next connection should be retrieved from the stream so that the request can be retried.
|
* Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried.
|
||||||
* The maximum total of attempts is equal to the number of connections that are available in the stream.
|
* The maximum total of attempts is equal to the number of connections that are available in the iterator.
|
||||||
* It may happen that the stream is empty, in which case it means that there aren't healthy connections to use.
|
* The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case.
|
||||||
* Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it.
|
* In case there are no alive connections available, or dead ones that should be retried, one dead connection
|
||||||
|
* gets resurrected and returned.
|
||||||
*/
|
*/
|
||||||
public final Iterator<Connection> nextConnection() {
|
public final Iterator<Connection> nextConnection() {
|
||||||
List<Connection> connections = getConnections();
|
List<Connection> connections = getConnections();
|
||||||
|
@ -73,17 +72,30 @@ public abstract class ConnectionPool implements Closeable {
|
||||||
throw new IllegalStateException("no connections available in the connection pool");
|
throw new IllegalStateException("no connections available in the connection pool");
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Connection> sortedConnections = new ArrayList<>(connections);
|
List<Connection> rotatedConnections = new ArrayList<>(connections);
|
||||||
//TODO is it possible to make this O(1)? (rotate is O(n))
|
//TODO is it possible to make this O(1)? (rotate is O(n))
|
||||||
Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement());
|
Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement());
|
||||||
Iterator<Connection> connectionIterator = sortedConnections.iterator();
|
Iterator<Connection> connectionIterator = rotatedConnections.iterator();
|
||||||
while (connectionIterator.hasNext()) {
|
while (connectionIterator.hasNext()) {
|
||||||
Connection connection = connectionIterator.next();
|
Connection connection = connectionIterator.next();
|
||||||
if (connection.isAlive() == false && connection.shouldBeRetried() == false) {
|
if (connection.isAlive() == false && connection.shouldBeRetried() == false) {
|
||||||
connectionIterator.remove();
|
connectionIterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return connectionIterator;
|
if (rotatedConnections.isEmpty()) {
|
||||||
|
List<Connection> sortedConnections = new ArrayList<>(connections);
|
||||||
|
Collections.sort(sortedConnections, new Comparator<Connection>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Connection o1, Connection o2) {
|
||||||
|
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Connection connection = sortedConnections.get(0);
|
||||||
|
connection.markResurrected();
|
||||||
|
logger.trace("marked connection resurrected for " + connection.getHost());
|
||||||
|
return Collections.singleton(connection).iterator();
|
||||||
|
}
|
||||||
|
return rotatedConnections.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,25 +111,6 @@ public abstract class ConnectionPool implements Closeable {
|
||||||
return Collections.unmodifiableList(connections);
|
return Collections.unmodifiableList(connections);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a connection that is not necessarily healthy, but can be used for a request attempt. To be called as last resort
|
|
||||||
* only in case {@link #nextConnection()} returns an empty stream.
|
|
||||||
*/
|
|
||||||
public final Connection lastResortConnection() {
|
|
||||||
List<Connection> connections = getConnections();
|
|
||||||
if (connections.isEmpty()) {
|
|
||||||
throw new IllegalStateException("no connections available in the connection pool");
|
|
||||||
}
|
|
||||||
List<Connection> sortedConnections = new ArrayList<>(connections);
|
|
||||||
Collections.sort(sortedConnections, new Comparator<Connection>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Connection o1, Connection o2) {
|
|
||||||
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return sortedConnections.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called after each successful request call.
|
* Called after each successful request call.
|
||||||
* Receives as an argument the connection that was used for the successful request.
|
* Receives as an argument the connection that was used for the successful request.
|
||||||
|
|
|
@ -39,7 +39,6 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -63,13 +62,7 @@ public final class RestClient implements Closeable {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
URI uri = buildUri(endpoint, params);
|
URI uri = buildUri(endpoint, params);
|
||||||
HttpRequestBase request = createHttpRequest(method, uri, entity);
|
HttpRequestBase request = createHttpRequest(method, uri, entity);
|
||||||
Iterator<Connection> connectionIterator = connectionPool.nextConnection();
|
return performRequest(request, connectionPool.nextConnection());
|
||||||
if (connectionIterator.hasNext() == false) {
|
|
||||||
Connection connection = connectionPool.lastResortConnection();
|
|
||||||
logger.info("no healthy nodes available, trying " + connection.getHost());
|
|
||||||
return performRequest(request, Collections.singleton(connection).iterator());
|
|
||||||
}
|
|
||||||
return performRequest(request, connectionIterator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
|
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.client.ConnectionPool;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -108,14 +107,7 @@ public class SniffingConnectionPool extends ConnectionPool {
|
||||||
void sniff(HttpHost excludeHost) {
|
void sniff(HttpHost excludeHost) {
|
||||||
if (running.compareAndSet(false, true)) {
|
if (running.compareAndSet(false, true)) {
|
||||||
try {
|
try {
|
||||||
Iterator<Connection> connectionIterator = nextConnection();
|
sniff(nextConnection(), excludeHost);
|
||||||
if (connectionIterator.hasNext()) {
|
|
||||||
sniff(connectionIterator, excludeHost);
|
|
||||||
} else {
|
|
||||||
Connection connection = lastResortConnection();
|
|
||||||
logger.info("no healthy nodes available, trying " + connection.getHost());
|
|
||||||
sniff(Collections.singleton(connection).iterator(), excludeHost);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logger.error("error while sniffing nodes", t);
|
logger.error("error while sniffing nodes", t);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue