remove ConnectionPool interface

This commit is contained in:
javanna 2016-05-09 12:28:47 +02:00 committed by Luca Cavanna
parent 17a21f0272
commit e040d2fc77
5 changed files with 76 additions and 121 deletions

View File

@ -1,103 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* Base static connection pool implementation that marks connections as dead/alive when needed.
* Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call.
* In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()}
* and resurrected so that a last resort request attempt can be performed.
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
* handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed).
*/
public abstract class AbstractStaticConnectionPool implements ConnectionPool {
private static final Log logger = LogFactory.getLog(AbstractStaticConnectionPool.class);
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
/**
* Allows to retrieve the concrete list of connections. Not defined directly as a member
* of this class as subclasses may need to handle concurrency if the list can change, for
* instance defining the field as volatile. On the other hand static implementations
* can just make the list final instead.
*/
protected abstract List<Connection> getConnections();
@Override
public final Stream<Connection> nextConnection() {
List<Connection> connections = getConnections();
if (connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool");
}
List<Connection> sortedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement());
return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried());
}
/**
* Helper method to be used by subclasses when needing to create a new list
* of connections given their corresponding hosts
*/
protected List<Connection> createConnections(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new Connection(host));
}
return Collections.unmodifiableList(connections);
}
@Override
public Connection lastResortConnection() {
Connection Connection = getConnections().stream()
.sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get();
Connection.markResurrected();
return Connection;
}
@Override
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
}
@Override
public void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
}
}

View File

@ -19,16 +19,44 @@
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* Pool of connections to the different hosts that belong to an elasticsearch cluster.
* It keeps track of the different hosts to communicate with and allows to retrieve a stream of connections to be used
* for each request. Exposes the needed hooks to be able to eventually mark connections dead or alive.
* for each request. Marks connections as dead/alive when needed.
* Provides a stream of alive connections or dead ones that should be retried for each {@link #nextConnection()} call.
* In case the returned stream is empty a last resort dead connection should be retrieved by calling {@link #lastResortConnection()}
* and resurrected so that a last resort request attempt can be performed.
* The {@link #onSuccess(Connection)} method marks the connection provided as an argument alive.
* The {@link #onFailure(Connection)} method marks the connection provided as an argument dead.
* This base implementation doesn't define the list implementation that stores connections, so that concurrency can be
* handled in subclasses depending on the usecase (e.g. defining the list volatile or final when needed).
*/
public interface ConnectionPool extends Closeable {
public abstract class ConnectionPool implements Closeable {
private static final Log logger = LogFactory.getLog(ConnectionPool.class);
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
/**
* Allows to retrieve the concrete list of connections. Not defined directly as a member
* of this class as subclasses may need to handle concurrency if the list can change, for
* instance defining the field as volatile. On the other hand static implementations
* can just make the list final instead.
*/
protected abstract List<Connection> getConnections();
/**
* Returns a stream of connections that should be used for a request call.
@ -38,23 +66,57 @@ public interface ConnectionPool extends Closeable {
* It may happen that the stream is empty, in which case it means that there aren't healthy connections to use.
* Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it.
*/
Stream<Connection> nextConnection();
public final Stream<Connection> nextConnection() {
List<Connection> connections = getConnections();
if (connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool");
}
List<Connection> sortedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement());
return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried());
}
/**
* Helper method to be used by subclasses when needing to create a new list
* of connections given their corresponding hosts
*/
protected final List<Connection> createConnections(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new Connection(host));
}
return Collections.unmodifiableList(connections);
}
/**
* Returns a connection that is not necessarily healthy, but can be used for a request attempt. To be called as last resort
* only in case {@link #nextConnection()} returns an empty stream
* only in case {@link #nextConnection()} returns an empty stream.
*/
Connection lastResortConnection();
public final Connection lastResortConnection() {
Connection Connection = getConnections().stream()
.sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get();
Connection.markResurrected();
return Connection;
}
/**
* Called after each successful request call.
* Receives as an argument the connection that was used for the successful request.
*/
void onSuccess(Connection connection);
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
}
/**
* Called after each failed attempt.
* Receives as an argument the connection that was used for the failed attempt.
*/
void onFailure(Connection connection) throws IOException;
public void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
}
}

View File

@ -27,7 +27,7 @@ import java.util.List;
/**
* Static implementation of {@link ConnectionPool}. Its underlying list of connections is immutable.
*/
public class StaticConnectionPool extends AbstractStaticConnectionPool {
public class StaticConnectionPool extends ConnectionPool {
private final List<Connection> connections;

View File

@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.elasticsearch.client.AbstractStaticConnectionPool;
import org.elasticsearch.client.ConnectionPool;
import org.elasticsearch.client.Connection;
import java.io.IOException;
@ -43,7 +43,7 @@ import java.util.stream.Stream;
* Connection pool implementation that sniffs nodes from elasticsearch at regular intervals.
* Can optionally sniff nodes on each failure as well.
*/
public class SniffingConnectionPool extends AbstractStaticConnectionPool {
public class SniffingConnectionPool extends ConnectionPool {
private static final Log logger = LogFactory.getLog(SniffingConnectionPool.class);

View File

@ -25,8 +25,9 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.lucene.util.LuceneTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.logging.LogManager;
import java.util.stream.Stream;
public class RestClientTests extends LuceneTestCase {
@ -38,13 +39,8 @@ public class RestClientTests extends LuceneTestCase {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
ConnectionPool connectionPool = new ConnectionPool() {
@Override
public Stream<Connection> nextConnection() {
return null;
}
@Override
public Connection lastResortConnection() {
return null;
protected List<Connection> getConnections() {
return Collections.emptyList();
}
@Override