merge Connection and StatefulConnection into one class, remove generic type from Transport

This commit is contained in:
javanna 2016-05-06 09:52:33 +02:00 committed by Luca Cavanna
parent a472544ab4
commit 062a21678c
10 changed files with 139 additions and 185 deletions

View File

@ -38,57 +38,57 @@ import java.util.stream.Stream;
* allows to filter connections through a customizable {@link Predicate}, called connection selector.
* In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()}
* and resurrected so that a single request attempt can be performed.
* The {@link #onSuccess(StatefulConnection)} method marks the connection provided as an argument alive.
* The {@link #onFailure(StatefulConnection)} method marks the connection provided as an argument dead.
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
* handled in the subclasses depending on the usecase (e.g. defining the list volatile when needed).
*/
public abstract class AbstractStaticConnectionPool implements ConnectionPool<StatefulConnection> {
public abstract class AbstractStaticConnectionPool implements ConnectionPool {
private static final Log logger = LogFactory.getLog(AbstractStaticConnectionPool.class);
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
protected abstract List<StatefulConnection> getConnections();
protected abstract List<Connection> getConnections();
@Override
public final Stream<StatefulConnection> nextConnection() {
List<StatefulConnection> connections = getConnections();
public final Stream<Connection> nextConnection() {
List<Connection> connections = getConnections();
if (connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool");
}
List<StatefulConnection> sortedConnections = new ArrayList<>(connections);
List<Connection> sortedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement());
return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried());
}
protected List<StatefulConnection> createConnections(HttpHost... hosts) {
List<StatefulConnection> connections = new ArrayList<>();
protected List<Connection> createConnections(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new StatefulConnection(host));
connections.add(new Connection(host));
}
return Collections.unmodifiableList(connections);
}
@Override
public StatefulConnection lastResortConnection() {
StatefulConnection statefulConnection = getConnections().stream()
public Connection lastResortConnection() {
Connection Connection = getConnections().stream()
.sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get();
statefulConnection.markResurrected();
return statefulConnection;
Connection.markResurrected();
return Connection;
}
@Override
public void onSuccess(StatefulConnection connection) {
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
}
@Override
public void onFailure(StatefulConnection connection) throws IOException {
public void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
}

View File

@ -21,15 +21,21 @@ package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.util.concurrent.TimeUnit;
/**
* Simplest representation of a connection to an elasticsearch node.
* It doesn't have any mutable state. It holds the host that the connection points to.
* Represents a connection to a host. It holds the host that the connection points to.
* Allows the transport to deal with very simple connection objects that are immutable.
* Any change to the state of connections should be made through the connection pool
* which is aware of the connection object that it supports.
* Any change to the state of a connection should be made through the connection pool.
*/
public class Connection {
//TODO make these values configurable through the connection pool?
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final long MAX_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
private final HttpHost host;
private volatile State state = State.UNKNOWN;
private volatile int failedAttempts = -1;
private volatile long deadUntil = -1;
/**
* Creates a new connection pointing to the provided {@link HttpHost} argument
@ -44,4 +50,78 @@ public class Connection {
public HttpHost getHost() {
return host;
}
/**
* Marks connection as dead. Should be called in case the corresponding node is not responding or caused failures.
* Once marked dead, the number of failed attempts will be incremented on each call to this method. A dead connection
* should be retried once {@link #shouldBeRetried()} returns true, which depends on the number of previous failed attempts
* and when the last failure was registered.
*/
void markDead() {
synchronized (this) {
int failedAttempts = Math.max(this.failedAttempts, 0);
long timeoutMillis = (long)Math.min(DEFAULT_CONNECTION_TIMEOUT_MILLIS * 2 * Math.pow(2, failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_MILLIS);
this.deadUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
this.failedAttempts = ++failedAttempts;
this.state = State.DEAD;
}
}
/**
* Marks this connection alive. Should be called when the corresponding node is working properly.
* Will reset the number of failed attempts that were counted in case the connection was previously dead,
* as well as its dead timeout.
*/
void markAlive() {
if (this.state != State.ALIVE) {
synchronized (this) {
this.deadUntil = -1;
this.failedAttempts = 0;
this.state = State.ALIVE;
}
}
}
/**
* Resets the connection to its initial state, so it will be retried. To be called when all the connections in the pool
* are dead, so that one connection can be retried. Note that calling this method only changes the state of the connection,
* it doesn't reset its failed attempts and dead until timestamp. That way if the connection goes back to dead straightaway
* all of its previous failed attempts are taken into account.
*/
void markResurrected() {
if (this.state == State.DEAD) {
synchronized (this) {
this.state = State.UNKNOWN;
}
}
}
/**
* Returns the timestamp till the connection is supposed to stay dead till it can be retried
*/
public long getDeadUntil() {
return deadUntil;
}
/**
* Returns true if the connection is alive, false otherwise.
*/
public boolean isAlive() {
return state == State.ALIVE;
}
/**
* Returns true in case the connection is not alive but should be used/retried, false otherwise.
* Returns true in case the connection is in unknown state (never used before) or resurrected. When the connection is dead,
* returns true when it is time to retry it, depending on how many failed attempts were registered and when the last failure
* happened (minimum 1 minute, maximum 30 minutes).
*/
public boolean shouldBeRetried() {
return state == State.UNKNOWN || (state == State.DEAD && System.nanoTime() - deadUntil >= 0);
}
private enum State {
UNKNOWN, DEAD, ALIVE
}
}

View File

@ -24,14 +24,12 @@ import java.io.IOException;
import java.util.stream.Stream;
/**
* Pool of connections to the different nodes that belong to an elasticsearch cluster.
* It keeps track of the different nodes to communicate with and allows to retrieve a stream of connections to be used
* Pool of connections to the different hosts that belong to an elasticsearch cluster.
* It keeps track of the different hosts to communicate with and allows to retrieve a stream of connections to be used
* for each request. Exposes the needed hooks to be able to eventually mark connections dead or alive and execute
* arbitrary operations before each single request attempt.
*
* @param <C> the type of {@link Connection} that the pool supports
*/
public interface ConnectionPool<C extends Connection> extends Closeable {
public interface ConnectionPool extends Closeable {
/**
* Returns a stream of connections that should be used for a request call.
@ -41,29 +39,29 @@ public interface ConnectionPool<C extends Connection> extends Closeable {
* It may happen that the stream is empty, in which case it means that there aren't healthy connections to use.
* Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it.
*/
Stream<C> nextConnection();
Stream<Connection> nextConnection();
/**
* Returns a connection that is not necessarily healthy, but can be used for a request attempt. To be called as last resort
* only in case {@link #nextConnection()} returns an empty stream
*/
C lastResortConnection();
Connection lastResortConnection();
/**
* Called before each single request attempt. Allows to execute operations (e.g. ping) before each request.
* Receives as an argument the connection that is going to be used for the request.
*/
void beforeAttempt(C connection) throws IOException;
void beforeAttempt(Connection connection) throws IOException;
/**
* Called after each successful request call.
* Receives as an argument the connection that was used for the successful request.
*/
void onSuccess(C connection);
void onSuccess(Connection connection);
/**
* Called after each failed attempt.
* Receives as an argument the connection that was used for the failed attempt.
*/
void onFailure(C connection) throws IOException;
void onFailure(Connection connection) throws IOException;
}

View File

@ -34,7 +34,7 @@ public class ElasticsearchResponseException extends IOException {
private final RequestLine requestLine;
private final StatusLine statusLine;
ElasticsearchResponseException(RequestLine requestLine, HttpHost host, StatusLine statusLine) {
public ElasticsearchResponseException(RequestLine requestLine, HttpHost host, StatusLine statusLine) {
super(buildMessage(requestLine, host, statusLine));
this.host = host;
this.requestLine = requestLine;

View File

@ -29,8 +29,8 @@ public final class RestClient implements Closeable {
private final Transport transport;
public RestClient(CloseableHttpClient client, ConnectionPool<? extends Connection> connectionPool, long maxRetryTimeout) {
this.transport = new Transport<>(client, connectionPool, maxRetryTimeout);
public RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) {
this.transport = new Transport(client, connectionPool, maxRetryTimeout);
}
public ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity)

View File

@ -1,127 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.util.concurrent.TimeUnit;
/**
* {@link Connection} subclass that has a mutable state, based on previous usage.
* When first created, a connection is in unknown state, till it is used for the first time
* and marked either dead or alive based on the outcome of the first usage.
* Should be marked alive when properly working.
* Should be marked dead when it caused a failure, in which case the connection may be retried some time later,
* as soon as {@link #shouldBeRetried()} returns true, which depends on how many consecutive failed attempts
* were counted and when the last one was registered.
* Should be marked resurrected if in dead state, as last resort in case there are no live connections available
* and none of the dead ones are ready to be retried yet. When marked resurrected, the number of failed attempts
* and its timeout is not reset so that if it gets marked dead again it returns to the exact state before resurrection.
*/
public final class StatefulConnection extends Connection {
//TODO make these values configurable through the connection pool?
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final long MAX_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
private volatile State state = State.UNKNOWN;
private volatile int failedAttempts = -1;
private volatile long deadUntil = -1;
/**
* Creates a new mutable connection pointing to the provided {@link HttpHost} argument
*/
public StatefulConnection(HttpHost host) {
super(host);
}
/**
* Marks connection as dead. Should be called in case the corresponding node is not responding or caused failures.
* Once marked dead, the number of failed attempts will be incremented on each call to this method. A dead connection
* should be retried once {@link #shouldBeRetried()} returns true, which depends on the number of previous failed attempts
* and when the last failure was registered.
*/
void markDead() {
synchronized (this) {
int failedAttempts = Math.max(this.failedAttempts, 0);
long timeoutMillis = (long)Math.min(DEFAULT_CONNECTION_TIMEOUT_MILLIS * 2 * Math.pow(2, failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_MILLIS);
this.deadUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
this.failedAttempts = ++failedAttempts;
this.state = State.DEAD;
}
}
/**
* Marks this connection alive. Should be called when the corresponding node is working properly.
* Will reset the number of failed attempts that were counted in case the connection was previously dead,
* as well as its dead timeout.
*/
void markAlive() {
if (this.state != State.ALIVE) {
synchronized (this) {
this.deadUntil = -1;
this.failedAttempts = 0;
this.state = State.ALIVE;
}
}
}
/**
* Resets the connection to its initial state, so it will be retried. To be called when all the connections in the pool
* are dead, so that one connection can be retried. Note that calling this method only changes the state of the connection,
* it doesn't reset its failed attempts and dead until timestamp. That way if the connection goes back to dead straightaway
* all of its previous failed attempts are taken into account.
*/
void markResurrected() {
if (this.state == State.DEAD) {
synchronized (this) {
this.state = State.UNKNOWN;
}
}
}
/**
* Returns the timestamp till the connection is supposed to stay dead till it can be retried
*/
public long getDeadUntil() {
return deadUntil;
}
/**
* Returns true if the connection is alive, false otherwise.
*/
public boolean isAlive() {
return state == State.ALIVE;
}
/**
* Returns true in case the connection is not alive but should be used/retried, false otherwise.
* Returns true in case the connection is in unknown state (never used before) or resurrected. When the connection is dead,
* returns true when it is time to retry it, depending on how many failed attempts were registered and when the last failure
* happened (minimum 1 minute, maximum 30 minutes).
*/
public boolean shouldBeRetried() {
return state == State.UNKNOWN || (state == State.DEAD && System.nanoTime() - deadUntil >= 0);
}
private enum State {
UNKNOWN, DEAD, ALIVE
}
}

View File

@ -40,7 +40,7 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool {
private final CloseableHttpClient client;
private final boolean pingEnabled;
private final RequestConfig pingRequestConfig;
private final List<StatefulConnection> connections;
private final List<Connection> connections;
public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig, HttpHost... hosts) {
Objects.requireNonNull(client, "client cannot be null");
@ -55,12 +55,14 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool {
}
@Override
protected List<StatefulConnection> getConnections() {
protected List<Connection> getConnections() {
return connections;
}
//TODO do we still need pinging? seems like a workaround for some clients that don't support connect timeout but we have that
@Override
public void beforeAttempt(StatefulConnection connection) throws IOException {
public void beforeAttempt(Connection connection) throws IOException {
if (pingEnabled && connection.shouldBeRetried()) {
HttpHead httpHead = new HttpHead("/");
httpHead.setConfig(pingRequestConfig);

View File

@ -44,15 +44,15 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
final class Transport<C extends Connection> implements Closeable {
final class Transport implements Closeable {
private static final Log logger = LogFactory.getLog(Transport.class);
private final CloseableHttpClient client;
private final ConnectionPool<C> connectionPool;
private final ConnectionPool connectionPool;
private final long maxRetryTimeout;
Transport(CloseableHttpClient client, ConnectionPool<C> connectionPool, long maxRetryTimeout) {
Transport(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) {
Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(connectionPool, "connectionPool cannot be null");
if (maxRetryTimeout <= 0) {
@ -66,23 +66,23 @@ final class Transport<C extends Connection> implements Closeable {
ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity) throws IOException {
URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
Iterator<C> connectionIterator = connectionPool.nextConnection().iterator();
Iterator<Connection> connectionIterator = connectionPool.nextConnection().iterator();
if (connectionIterator.hasNext() == false) {
C connection = connectionPool.lastResortConnection();
Connection connection = connectionPool.lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost());
return performRequest(request, Stream.of(connection).iterator());
}
return performRequest(request, connectionIterator);
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<C> connectionIterator) throws IOException {
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
//we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt
long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98);
IOException lastSeenException = null;
long startTime = System.nanoTime();
while (connectionIterator.hasNext()) {
C connection = connectionIterator.next();
Connection connection = connectionIterator.next();
if (lastSeenException != null) {
long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
@ -124,7 +124,7 @@ final class Transport<C extends Connection> implements Closeable {
throw lastSeenException;
}
private ElasticsearchResponse performRequest(HttpRequestBase request, C connection) throws IOException {
private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException {
CloseableHttpResponse response;
try {
response = client.execute(connection.getHost(), request);

View File

@ -25,7 +25,7 @@ import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.elasticsearch.client.AbstractStaticConnectionPool;
import org.elasticsearch.client.StatefulConnection;
import org.elasticsearch.client.Connection;
import java.io.IOException;
import java.util.Iterator;
@ -46,9 +46,10 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
private final boolean sniffOnFailure;
private final Sniffer sniffer;
private volatile List<StatefulConnection> connections;
private volatile List<Connection> connections;
private final SnifferTask snifferTask;
//TODO do we still need the sniff request timeout? or should we just use a low connect timeout?
public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay,
CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme,
HttpHost... hosts) {
@ -69,17 +70,17 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
}
@Override
protected List<StatefulConnection> getConnections() {
protected List<Connection> getConnections() {
return this.connections;
}
@Override
public void beforeAttempt(StatefulConnection connection) throws IOException {
public void beforeAttempt(Connection connection) throws IOException {
}
@Override
public void onFailure(StatefulConnection connection) throws IOException {
public void onFailure(Connection connection) throws IOException {
super.onFailure(connection);
if (sniffOnFailure) {
//re-sniff immediately but take out the node that failed
@ -130,11 +131,11 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
void sniff(Predicate<HttpHost> hostFilter) {
if (running.compareAndSet(false, true)) {
try {
Iterator<StatefulConnection> connectionIterator = nextConnection().iterator();
Iterator<Connection> connectionIterator = nextConnection().iterator();
if (connectionIterator.hasNext()) {
sniff(connectionIterator, hostFilter);
} else {
StatefulConnection connection = lastResortConnection();
Connection connection = lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost());
sniff(Stream.of(connection).iterator(), hostFilter);
}
@ -160,10 +161,10 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
}
}
void sniff(Iterator<StatefulConnection> connectionIterator, Predicate<HttpHost> hostFilter) throws IOException {
void sniff(Iterator<Connection> connectionIterator, Predicate<HttpHost> hostFilter) throws IOException {
IOException lastSeenException = null;
while (connectionIterator.hasNext()) {
StatefulConnection connection = connectionIterator.next();
Connection connection = connectionIterator.next();
try {
List<HttpHost> sniffedNodes = sniffer.sniffNodes(connection.getHost());
HttpHost[] filteredNodes = sniffedNodes.stream().filter(hostFilter).toArray(HttpHost[]::new);

View File

@ -36,7 +36,7 @@ public class TransportTests extends LuceneTestCase {
public void testConstructor() {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
ConnectionPool<Connection> connectionPool = new ConnectionPool<Connection>() {
ConnectionPool connectionPool = new ConnectionPool() {
@Override
public Stream<Connection> nextConnection() {
return null;
@ -69,27 +69,27 @@ public class TransportTests extends LuceneTestCase {
};
try {
new Transport<>(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
new Transport(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
fail("transport creation should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "client cannot be null");
}
try {
new Transport<>(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
new Transport(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
fail("transport creation should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "connectionPool cannot be null");
}
try {
new Transport<>(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("transport creation should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "maxRetryTimeout must be greater than 0");
}
Transport<Connection> transport = new Transport<>(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
Transport transport = new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
assertNotNull(transport);
}
}