make host state immutable

Instead of having a Connection mutable object that holds the state of the connection to each host, we now have immutable objects only. We keep two sets, one with all the hosts, one with the blacklisted ones. Once we blacklist a host we associate it with a DeadHostState which keeps track of the number of failed attempts and when the host should be retried. A new state object is created each and every time the state of the host needs to be updated.
This commit is contained in:
javanna 2016-05-25 18:37:10 +02:00 committed by Luca Cavanna
parent e81aad972a
commit 6490355cb6
5 changed files with 144 additions and 156 deletions

View File

@ -1,94 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.util.concurrent.TimeUnit;
/**
* Represents a connection to a host. It holds the host that the connection points to and the state of the connection to it.
*/
public class Connection {
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final long MAX_CONNECTION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
private final HttpHost host;
private volatile int failedAttempts = 0;
private volatile long deadUntil = -1;
/**
* Creates a new connection pointing to the provided {@link HttpHost} argument
*/
public Connection(HttpHost host) {
this.host = host;
}
/**
* Returns the {@link HttpHost} that the connection points to
*/
public HttpHost getHost() {
return host;
}
/**
* Marks connection as dead. Should be called in case the corresponding node is not responding or caused failures.
* Once marked dead, the number of failed attempts will be incremented on each call to this method. A dead connection
* should be retried once {@link #isBlacklisted()} returns true, which depends on the number of previous failed attempts
* and when the last failure was registered.
*/
void markDead() {
synchronized (this) {
int failedAttempts = Math.max(this.failedAttempts, 0);
long timeoutMillis = (long)Math.min(DEFAULT_CONNECTION_TIMEOUT_MILLIS * 2 * Math.pow(2, failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_MILLIS);
this.deadUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
this.failedAttempts = ++failedAttempts;
}
}
/**
* Marks this connection alive. Should be called when the corresponding node is working properly.
* Will reset the number of failed attempts that were counted in case the connection was previously dead, as well as its timeout.
*/
void markAlive() {
if (this.failedAttempts > 0) {
synchronized (this) {
this.deadUntil = -1;
this.failedAttempts = 0;
}
}
}
/**
* Returns the timestamp till the connection is supposed to stay dead. After that moment the connection should be retried
*/
public long getDeadUntil() {
return deadUntil;
}
/**
* Returns true when the connection should be skipped due to previous failures, false in case the connection is alive
* or dead but ready to be retried. When the connection is dead, returns false when it is time to retry it, depending
* on how many failed attempts were registered and when the last failure happened (minimum 1 minute, maximum 30 minutes).
*/
public boolean isBlacklisted() {
return failedAttempts > 0 && System.nanoTime() - deadUntil < 0;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import java.util.concurrent.TimeUnit;
/**
* Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
* when the host should be retried (based on number of previous failed attempts).
* Class is immutable, a new copy of it should be created each time the state has to be changed.
*/
class DeadHostState {
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();
private final int failedAttempts;
private final long deadUntil;
private DeadHostState() {
this.failedAttempts = 1;
this.deadUntil = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
}
DeadHostState(DeadHostState previousDeadHostState) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntil = System.nanoTime() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
}
/**
* Returns the timestamp (nanos) till the host is supposed to stay dead without being retried.
* After that the host should be retried.
*/
long getDeadUntil() {
return deadUntil;
}
}

View File

@ -47,11 +47,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -62,23 +66,25 @@ public final class RestClient implements Closeable {
private final CloseableHttpClient client;
private final long maxRetryTimeout;
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
private volatile List<Connection> connections;
private final AtomicInteger lastHostIndex = new AtomicInteger(0);
private volatile Set<HttpHost> hosts;
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private volatile FailureListener failureListener = new FailureListener();
private RestClient(CloseableHttpClient client, long maxRetryTimeout, HttpHost... hosts) {
this.client = client;
this.maxRetryTimeout = maxRetryTimeout;
setNodes(hosts);
setHosts(hosts);
}
public synchronized void setNodes(HttpHost... hosts) {
List<Connection> connections = new ArrayList<>(hosts.length);
public synchronized void setHosts(HttpHost... hosts) {
Set<HttpHost> httpHosts = new HashSet<>();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new Connection(host));
httpHosts.add(host);
}
this.connections = Collections.unmodifiableList(connections);
this.hosts = Collections.unmodifiableSet(httpHosts);
this.blacklist.clear();
}
public ElasticsearchResponse performRequest(String method, String endpoint, Map<String, String> params,
@ -94,9 +100,9 @@ public final class RestClient implements Closeable {
long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98);
IOException lastSeenException = null;
long startTime = System.nanoTime();
Iterator<Connection> connectionIterator = nextConnection();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
Iterator<HttpHost> hostIterator = nextHost();
while (hostIterator.hasNext()) {
HttpHost host = hostIterator.next();
if (lastSeenException != null) {
long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
@ -112,22 +118,22 @@ public final class RestClient implements Closeable {
CloseableHttpResponse response;
try {
response = client.execute(connection.getHost(), request);
response = client.execute(host, request);
} catch(IOException e) {
RequestLogger.log(logger, "request failed", request, connection.getHost(), e);
onFailure(connection);
RequestLogger.log(logger, "request failed", request, host, e);
onFailure(host);
lastSeenException = addSuppressedException(lastSeenException, e);
continue;
}
ElasticsearchResponse elasticsearchResponse = new ElasticsearchResponse(request.getRequestLine(),
connection.getHost(), response);
host, response);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < 300 || (request.getMethod().equals(HttpHead.METHOD_NAME) && statusCode == 404) ) {
RequestLogger.log(logger, "request succeeded", request, connection.getHost(), response);
onSuccess(connection);
RequestLogger.log(logger, "request succeeded", request, host, response);
onSuccess(host);
return elasticsearchResponse;
} else {
RequestLogger.log(logger, "request failed", request, connection.getHost(), response);
RequestLogger.log(logger, "request failed", request, host, response);
String responseBody;
try {
if (elasticsearchResponse.getEntity() == null) {
@ -143,10 +149,10 @@ public final class RestClient implements Closeable {
lastSeenException = addSuppressedException(lastSeenException, elasticsearchResponseException);
//clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places
if (statusCode == 502 || statusCode == 503 || statusCode == 504) {
onFailure(connection);
onFailure(host);
} else {
//don't retry and call onSuccess as the error should be a request problem, the node is alive
onSuccess(connection);
onSuccess(host);
break;
}
}
@ -156,63 +162,78 @@ public final class RestClient implements Closeable {
}
/**
* Returns an iterator of connections that should be used for a request call.
* Ideally, the first connection is retrieved from the iterator and used successfully for the request.
* Otherwise, after each failure the next connection should be retrieved from the iterator so that the request can be retried.
* The maximum total of attempts is equal to the number of connections that are available in the iterator.
* The iterator returned will never be empty, rather an {@link IllegalStateException} will be thrown in that case.
* In case there are no alive connections available, or dead ones that should be retried, one dead connection
* gets resurrected and returned.
* Returns an iterator of hosts to be used for a request call.
* Ideally, the first host is retrieved from the iterator and used successfully for the request.
* Otherwise, after each failure the next host should be retrieved from the iterator so that the request can be retried till
* the iterator is exhausted. The maximum total of attempts is equal to the number of hosts that are available in the iterator.
* The iterator returned will never be empty, rather an {@link IllegalStateException} in case there are no hosts.
* In case there are no healthy hosts available, or dead ones to be be retried, one dead host gets returned.
*/
private Iterator<Connection> nextConnection() {
if (this.connections.isEmpty()) {
throw new IllegalStateException("no connections available");
private Iterator<HttpHost> nextHost() {
if (this.hosts.isEmpty()) {
throw new IllegalStateException("no hosts available");
}
List<Connection> rotatedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(rotatedConnections, rotatedConnections.size() - lastConnectionIndex.getAndIncrement());
Iterator<Connection> connectionIterator = rotatedConnections.iterator();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (connection.isBlacklisted()) {
connectionIterator.remove();
Set<HttpHost> filteredHosts = new HashSet<>(hosts);
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
if (System.nanoTime() - entry.getValue().getDeadUntil() < 0) {
filteredHosts.remove(entry.getKey());
}
}
if (rotatedConnections.isEmpty()) {
List<Connection> sortedConnections = new ArrayList<>(connections);
Collections.sort(sortedConnections, new Comparator<Connection>() {
if (filteredHosts.isEmpty()) {
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
@Override
public int compare(Connection o1, Connection o2) {
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
return Long.compare(o1.getValue().getDeadUntil(), o2.getValue().getDeadUntil());
}
});
Connection connection = sortedConnections.get(0);
logger.trace("trying to resurrect connection for " + connection.getHost());
return Collections.singleton(connection).iterator();
HttpHost deadHost = sortedHosts.get(0).getKey();
logger.trace("resurrecting host [" + deadHost + "]");
return Collections.singleton(deadHost).iterator();
}
return rotatedConnections.iterator();
List<HttpHost> rotatedHosts = new ArrayList<>(filteredHosts);
//TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(rotatedHosts, rotatedHosts.size() - lastHostIndex.getAndIncrement());
return rotatedHosts.iterator();
}
/**
* Called after each successful request call.
* Receives as an argument the connection that was used for the successful request.
* Receives as an argument the host that was used for the successful request.
*/
public void onSuccess(Connection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getHost());
private void onSuccess(HttpHost host) {
DeadHostState removedHost = this.blacklist.remove(host);
if (logger.isDebugEnabled() && removedHost != null) {
logger.debug("removed host [" + host + "] from blacklist");
}
}
/**
* Called after each failed attempt.
* Receives as an argument the connection that was used for the failed attempt.
* Receives as an argument the host that was used for the failed attempt.
*/
private void onFailure(Connection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getHost());
failureListener.onFailure(connection);
private void onFailure(HttpHost host) throws IOException {
while(true) {
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
if (previousDeadHostState == null) {
logger.debug("added host [" + host + "] to blacklist");
break;
}
if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) {
logger.debug("updated host [" + host + "] already in blacklist");
break;
}
}
failureListener.onFailure(host);
}
/**
* Sets a {@link FailureListener} to be notified each and every time a host fails
*/
public synchronized void setFailureListener(FailureListener failureListener) {
this.failureListener = failureListener;
}
@ -397,7 +418,11 @@ public final class RestClient implements Closeable {
* The default implementation is a no-op.
*/
public static class FailureListener {
public void onFailure(Connection connection) throws IOException {
/**
* Notifies that the host provided as argument has just failed
*/
public void onFailure(HttpHost host) throws IOException {
}
}

View File

@ -23,7 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.elasticsearch.client.Connection;
import org.elasticsearch.client.RestClient;
import java.io.Closeable;
@ -57,10 +56,10 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
}
@Override
public void onFailure(Connection connection) throws IOException {
public void onFailure(HttpHost host) throws IOException {
if (sniffOnFailure) {
//re-sniff immediately but take out the node that failed
task.sniffOnFailure(connection.getHost());
task.sniffOnFailure(host);
}
}
@ -108,7 +107,7 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
sniffedNodes.remove(excludeHost);
}
logger.debug("sniffed nodes: " + sniffedNodes);
this.restClient.setNodes(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()]));
this.restClient.setHosts(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()]));
} catch (Throwable t) {
logger.error("error while sniffing nodes", t);
} finally {

View File

@ -107,8 +107,8 @@ public class SnifferBuilderTests extends LuceneTestCase {
if (random().nextBoolean()) {
builder.setSniffOnFailure(random().nextBoolean());
}
try (Sniffer connectionPool = builder.build()) {
assertNotNull(connectionPool);
try (Sniffer sniffer = builder.build()) {
assertNotNull(sniffer);
}
}
}