REST client: hosts marked dead for the first time should not be immediately retried (#29230)
This was the plan from day one but due to a silly bug nodes were immediately retried after they were marked as dead for the first time. From the second time on, the expected backoff was applied.
This commit is contained in:
parent
d1d3edf156
commit
13f9e922f3
|
@ -26,31 +26,50 @@ import java.util.concurrent.TimeUnit;
|
|||
* when the host should be retried (based on number of previous failed attempts).
|
||||
* Class is immutable, a new copy of it should be created each time the state has to be changed.
|
||||
*/
|
||||
final class DeadHostState {
|
||||
final class DeadHostState implements Comparable<DeadHostState> {
|
||||
|
||||
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
|
||||
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
|
||||
|
||||
static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();
|
||||
|
||||
private final int failedAttempts;
|
||||
private final long deadUntilNanos;
|
||||
private final TimeSupplier timeSupplier;
|
||||
|
||||
private DeadHostState() {
|
||||
/**
|
||||
* Build the initial dead state of a host. Useful when a working host stops functioning
|
||||
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
|
||||
*
|
||||
* @param timeSupplier a way to supply the current time and allow for unit testing
|
||||
*/
|
||||
DeadHostState(TimeSupplier timeSupplier) {
|
||||
this.failedAttempts = 1;
|
||||
this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
|
||||
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
|
||||
this.timeSupplier = timeSupplier;
|
||||
}
|
||||
|
||||
/**
|
||||
* We keep track of how many times a certain node fails consecutively. The higher that number is the longer we will wait
|
||||
* to retry that same node again. Minimum is 1 minute (for a node the only failed once), maximum is 30 minutes (for a node
|
||||
* that failed many consecutive times).
|
||||
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
|
||||
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
|
||||
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
|
||||
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
|
||||
*
|
||||
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
|
||||
*/
|
||||
DeadHostState(DeadHostState previousDeadHostState) {
|
||||
DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) {
|
||||
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
|
||||
MAX_CONNECTION_TIMEOUT_NANOS);
|
||||
this.deadUntilNanos = System.nanoTime() + timeoutNanos;
|
||||
this.deadUntilNanos = timeSupplier.nanoTime() + timeoutNanos;
|
||||
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
|
||||
this.timeSupplier = timeSupplier;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether it's time to retry to failed host or not.
|
||||
*
|
||||
* @return true if the host should be retried, false otherwise
|
||||
*/
|
||||
boolean shallBeRetried() {
|
||||
return timeSupplier.nanoTime() - deadUntilNanos > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -61,6 +80,15 @@ final class DeadHostState {
|
|||
return deadUntilNanos;
|
||||
}
|
||||
|
||||
int getFailedAttempts() {
|
||||
return failedAttempts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(DeadHostState other) {
|
||||
return Long.compare(deadUntilNanos, other.deadUntilNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DeadHostState{" +
|
||||
|
@ -68,4 +96,19 @@ final class DeadHostState {
|
|||
", deadUntilNanos=" + deadUntilNanos +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Time supplier that makes timing aspects pluggable to ease testing
|
||||
*/
|
||||
interface TimeSupplier {
|
||||
|
||||
TimeSupplier DEFAULT = new TimeSupplier() {
|
||||
@Override
|
||||
public long nanoTime() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
};
|
||||
|
||||
long nanoTime();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.http.nio.client.methods.HttpAsyncMethods;
|
|||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
|
||||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
|
||||
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
|
@ -72,7 +73,6 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
|
||||
/**
|
||||
* Client that connects to an Elasticsearch cluster through HTTP.
|
||||
|
@ -457,18 +457,18 @@ public class RestClient implements Closeable {
|
|||
do {
|
||||
Set<HttpHost> filteredHosts = new HashSet<>(hostTuple.hosts);
|
||||
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
|
||||
if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) {
|
||||
if (entry.getValue().shallBeRetried() == false) {
|
||||
filteredHosts.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
if (filteredHosts.isEmpty()) {
|
||||
//last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
|
||||
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
|
||||
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
|
||||
if (sortedHosts.size() > 0) {
|
||||
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
|
||||
@Override
|
||||
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
|
||||
return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos());
|
||||
return o1.getValue().compareTo(o2.getValue());
|
||||
}
|
||||
});
|
||||
HttpHost deadHost = sortedHosts.get(0).getKey();
|
||||
|
@ -499,14 +499,15 @@ public class RestClient implements Closeable {
|
|||
* Called after each failed attempt.
|
||||
* Receives as an argument the host that was used for the failed attempt.
|
||||
*/
|
||||
private void onFailure(HttpHost host) throws IOException {
|
||||
private void onFailure(HttpHost host) {
|
||||
while(true) {
|
||||
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
|
||||
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState(DeadHostState.TimeSupplier.DEFAULT));
|
||||
if (previousDeadHostState == null) {
|
||||
logger.debug("added host [" + host + "] to blacklist");
|
||||
break;
|
||||
}
|
||||
if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) {
|
||||
if (blacklist.replace(host, previousDeadHostState,
|
||||
new DeadHostState(previousDeadHostState, DeadHostState.TimeSupplier.DEFAULT))) {
|
||||
logger.debug("updated host [" + host + "] already in blacklist");
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
public class DeadHostStateTests extends RestClientTestCase {
|
||||
|
||||
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};
|
||||
|
||||
public void testInitialDeadHostStateDefaultTimeSupplier() {
|
||||
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
|
||||
long currentTime = System.nanoTime();
|
||||
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(currentTime));
|
||||
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
|
||||
}
|
||||
|
||||
public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
|
||||
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
|
||||
int iters = randomIntBetween(5, 30);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
DeadHostState deadHostState = new DeadHostState(previous, DeadHostState.TimeSupplier.DEFAULT);
|
||||
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos()));
|
||||
assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1));
|
||||
previous = deadHostState;
|
||||
}
|
||||
}
|
||||
|
||||
public void testCompareToDefaultTimeSupplier() {
|
||||
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
|
||||
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
|
||||
for (int i = 0; i < numObjects; i++) {
|
||||
if (i == 0) {
|
||||
deadHostStates[i] = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
|
||||
} else {
|
||||
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1], DeadHostState.TimeSupplier.DEFAULT);
|
||||
}
|
||||
}
|
||||
for (int k = 1; k < deadHostStates.length; k++) {
|
||||
assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos()));
|
||||
assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k]));
|
||||
}
|
||||
}
|
||||
|
||||
public void testShallBeRetried() {
|
||||
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
|
||||
DeadHostState deadHostState = null;
|
||||
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
|
||||
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
|
||||
timeSupplier.nanoTime = 0;
|
||||
if (i == 0) {
|
||||
deadHostState = new DeadHostState(timeSupplier);
|
||||
} else {
|
||||
deadHostState = new DeadHostState(deadHostState, timeSupplier);
|
||||
}
|
||||
for (int j = 0; j < expectedTimeoutSecond; j++) {
|
||||
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
|
||||
assertThat(deadHostState.shallBeRetried(), is(false));
|
||||
}
|
||||
int iters = randomIntBetween(5, 30);
|
||||
for (int j = 0; j < iters; j++) {
|
||||
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
|
||||
assertThat(deadHostState.shallBeRetried(), is(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeadHostStateTimeouts() {
|
||||
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
|
||||
zeroTimeSupplier.nanoTime = 0L;
|
||||
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
|
||||
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
|
||||
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
|
||||
previous = new DeadHostState(previous, zeroTimeSupplier);
|
||||
}
|
||||
//check that from here on the timeout does not increase
|
||||
int iters = randomIntBetween(5, 30);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier);
|
||||
assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()),
|
||||
equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1]));
|
||||
previous = deadHostState;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
|
||||
|
||||
long nanoTime;
|
||||
|
||||
@Override
|
||||
public long nanoTime() {
|
||||
return nanoTime;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -101,7 +101,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void createRestClient() throws IOException {
|
||||
public void createRestClient() {
|
||||
httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
|
||||
|
@ -160,17 +160,6 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
exec.shutdown();
|
||||
}
|
||||
|
||||
public void testNullPath() throws IOException {
|
||||
for (String method : getHttpMethods()) {
|
||||
try {
|
||||
restClient.performRequest(method, null);
|
||||
fail("path set to null should fail!");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("path must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
|
||||
*/
|
||||
|
@ -196,33 +185,6 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSetHosts() throws IOException {
|
||||
try {
|
||||
restClient.setHosts((HttpHost[]) null);
|
||||
fail("setHosts should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("hosts must not be null nor empty", e.getMessage());
|
||||
}
|
||||
try {
|
||||
restClient.setHosts();
|
||||
fail("setHosts should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("hosts must not be null nor empty", e.getMessage());
|
||||
}
|
||||
try {
|
||||
restClient.setHosts((HttpHost) null);
|
||||
fail("setHosts should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("host cannot be null", e.getMessage());
|
||||
}
|
||||
try {
|
||||
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
|
||||
fail("setHosts should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("host cannot be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End to end test for ok status codes
|
||||
*/
|
||||
|
@ -289,7 +251,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testIOExceptions() throws IOException {
|
||||
public void testIOExceptions() {
|
||||
for (String method : getHttpMethods()) {
|
||||
//IOExceptions should be let bubble up
|
||||
try {
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.net.URI;
|
|||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
@ -147,8 +148,48 @@ public class RestClientTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSetHostsWrongArguments() throws IOException {
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
restClient.setHosts((HttpHost[]) null);
|
||||
fail("setHosts should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("hosts must not be null nor empty", e.getMessage());
|
||||
}
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
restClient.setHosts();
|
||||
fail("setHosts should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("hosts must not be null nor empty", e.getMessage());
|
||||
}
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
restClient.setHosts((HttpHost) null);
|
||||
fail("setHosts should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("host cannot be null", e.getMessage());
|
||||
}
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
|
||||
fail("setHosts should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("host cannot be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testNullPath() throws IOException {
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
for (String method : getHttpMethods()) {
|
||||
try {
|
||||
restClient.performRequest(method, null);
|
||||
fail("path set to null should fail!");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("path must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static RestClient createRestClient() {
|
||||
HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200)};
|
||||
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
|
||||
return new RestClient(mock(CloseableHttpAsyncClient.class), randomIntBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue