From 13f9e922f358897caf9f49fa2c9495a96382f48c Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 27 Mar 2018 16:15:44 +0200 Subject: [PATCH] REST client: hosts marked dead for the first time should not be immediately retried (#29230) This was the plan from day one but due to a silly bug nodes were immediately retried after they were marked as dead for the first time. From the second time on, the expected backoff was applied. --- .../elasticsearch/client/DeadHostState.java | 63 ++++++++-- .../org/elasticsearch/client/RestClient.java | 15 +-- .../client/DeadHostStateTests.java | 118 ++++++++++++++++++ .../client/RestClientSingleHostTests.java | 42 +------ .../elasticsearch/client/RestClientTests.java | 43 ++++++- 5 files changed, 223 insertions(+), 58 deletions(-) create mode 100644 client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java diff --git a/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java b/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java index a7b222da70e..452e71b14d9 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java +++ b/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java @@ -26,31 +26,50 @@ import java.util.concurrent.TimeUnit; * when the host should be retried (based on number of previous failed attempts). * Class is immutable, a new copy of it should be created each time the state has to be changed. */ -final class DeadHostState { +final class DeadHostState implements Comparable { private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); - static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState(); - private final int failedAttempts; private final long deadUntilNanos; + private final TimeSupplier timeSupplier; - private DeadHostState() { + /** + * Build the initial dead state of a host. Useful when a working host stops functioning + * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so. + * + * @param timeSupplier a way to supply the current time and allow for unit testing + */ + DeadHostState(TimeSupplier timeSupplier) { this.failedAttempts = 1; - this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS; + this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS; + this.timeSupplier = timeSupplier; } /** - * We keep track of how many times a certain node fails consecutively. The higher that number is the longer we will wait - * to retry that same node again. Minimum is 1 minute (for a node the only failed once), maximum is 30 minutes (for a node - * that failed many consecutive times). + * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence + * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait + * to retry that same host again. Minimum is 1 minute (for a node the only failed once created + * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) + * + * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt */ - DeadHostState(DeadHostState previousDeadHostState) { + DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) { long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), MAX_CONNECTION_TIMEOUT_NANOS); - this.deadUntilNanos = System.nanoTime() + timeoutNanos; + this.deadUntilNanos = timeSupplier.nanoTime() + timeoutNanos; this.failedAttempts = previousDeadHostState.failedAttempts + 1; + this.timeSupplier = timeSupplier; + } + + /** + * Indicates whether it's time to retry to failed host or not. + * + * @return true if the host should be retried, false otherwise + */ + boolean shallBeRetried() { + return timeSupplier.nanoTime() - deadUntilNanos > 0; } /** @@ -61,6 +80,15 @@ final class DeadHostState { return deadUntilNanos; } + int getFailedAttempts() { + return failedAttempts; + } + + @Override + public int compareTo(DeadHostState other) { + return Long.compare(deadUntilNanos, other.deadUntilNanos); + } + @Override public String toString() { return "DeadHostState{" + @@ -68,4 +96,19 @@ final class DeadHostState { ", deadUntilNanos=" + deadUntilNanos + '}'; } + + /** + * Time supplier that makes timing aspects pluggable to ease testing + */ + interface TimeSupplier { + + TimeSupplier DEFAULT = new TimeSupplier() { + @Override + public long nanoTime() { + return System.nanoTime(); + } + }; + + long nanoTime(); + } } diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 4aa1a9d815c..48349c38589 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -47,6 +47,7 @@ import org.apache.http.nio.client.methods.HttpAsyncMethods; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import javax.net.ssl.SSLHandshakeException; import java.io.Closeable; import java.io.IOException; import java.net.SocketTimeoutException; @@ -72,7 +73,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLHandshakeException; /** * Client that connects to an Elasticsearch cluster through HTTP. @@ -457,18 +457,18 @@ public class RestClient implements Closeable { do { Set filteredHosts = new HashSet<>(hostTuple.hosts); for (Map.Entry entry : blacklist.entrySet()) { - if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) { + if (entry.getValue().shallBeRetried() == false) { filteredHosts.remove(entry.getKey()); } } if (filteredHosts.isEmpty()) { - //last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried + //last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried List> sortedHosts = new ArrayList<>(blacklist.entrySet()); if (sortedHosts.size() > 0) { Collections.sort(sortedHosts, new Comparator>() { @Override public int compare(Map.Entry o1, Map.Entry o2) { - return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos()); + return o1.getValue().compareTo(o2.getValue()); } }); HttpHost deadHost = sortedHosts.get(0).getKey(); @@ -499,14 +499,15 @@ public class RestClient implements Closeable { * Called after each failed attempt. * Receives as an argument the host that was used for the failed attempt. */ - private void onFailure(HttpHost host) throws IOException { + private void onFailure(HttpHost host) { while(true) { - DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE); + DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState(DeadHostState.TimeSupplier.DEFAULT)); if (previousDeadHostState == null) { logger.debug("added host [" + host + "] to blacklist"); break; } - if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) { + if (blacklist.replace(host, previousDeadHostState, + new DeadHostState(previousDeadHostState, DeadHostState.TimeSupplier.DEFAULT))) { logger.debug("updated host [" + host + "] already in blacklist"); break; } diff --git a/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java b/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java new file mode 100644 index 00000000000..75fbafd88f8 --- /dev/null +++ b/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +public class DeadHostStateTests extends RestClientTestCase { + + private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800}; + + public void testInitialDeadHostStateDefaultTimeSupplier() { + DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT); + long currentTime = System.nanoTime(); + assertThat(deadHostState.getDeadUntilNanos(), greaterThan(currentTime)); + assertThat(deadHostState.getFailedAttempts(), equalTo(1)); + } + + public void testDeadHostStateFromPreviousDefaultTimeSupplier() { + DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT); + int iters = randomIntBetween(5, 30); + for (int i = 0; i < iters; i++) { + DeadHostState deadHostState = new DeadHostState(previous, DeadHostState.TimeSupplier.DEFAULT); + assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos())); + assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1)); + previous = deadHostState; + } + } + + public void testCompareToDefaultTimeSupplier() { + int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30); + DeadHostState[] deadHostStates = new DeadHostState[numObjects]; + for (int i = 0; i < numObjects; i++) { + if (i == 0) { + deadHostStates[i] = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT); + } else { + deadHostStates[i] = new DeadHostState(deadHostStates[i - 1], DeadHostState.TimeSupplier.DEFAULT); + } + } + for (int k = 1; k < deadHostStates.length; k++) { + assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos())); + assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k])); + } + } + + public void testShallBeRetried() { + ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier(); + DeadHostState deadHostState = null; + for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) { + long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i]; + timeSupplier.nanoTime = 0; + if (i == 0) { + deadHostState = new DeadHostState(timeSupplier); + } else { + deadHostState = new DeadHostState(deadHostState, timeSupplier); + } + for (int j = 0; j < expectedTimeoutSecond; j++) { + timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1); + assertThat(deadHostState.shallBeRetried(), is(false)); + } + int iters = randomIntBetween(5, 30); + for (int j = 0; j < iters; j++) { + timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1); + assertThat(deadHostState.shallBeRetried(), is(true)); + } + } + } + + public void testDeadHostStateTimeouts() { + ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier(); + zeroTimeSupplier.nanoTime = 0L; + DeadHostState previous = new DeadHostState(zeroTimeSupplier); + for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) { + assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond)); + previous = new DeadHostState(previous, zeroTimeSupplier); + } + //check that from here on the timeout does not increase + int iters = randomIntBetween(5, 30); + for (int i = 0; i < iters; i++) { + DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier); + assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()), + equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1])); + previous = deadHostState; + } + } + + private static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier { + + long nanoTime; + + @Override + public long nanoTime() { + return nanoTime; + } + } +} diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index caf9ce6be2e..7786eefb97f 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -101,7 +101,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { @Before @SuppressWarnings("unchecked") - public void createRestClient() throws IOException { + public void createRestClient() { httpClient = mock(CloseableHttpAsyncClient.class); when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { @@ -160,17 +160,6 @@ public class RestClientSingleHostTests extends RestClientTestCase { exec.shutdown(); } - public void testNullPath() throws IOException { - for (String method : getHttpMethods()) { - try { - restClient.performRequest(method, null); - fail("path set to null should fail!"); - } catch (NullPointerException e) { - assertEquals("path must not be null", e.getMessage()); - } - } - } - /** * Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client */ @@ -196,33 +185,6 @@ public class RestClientSingleHostTests extends RestClientTestCase { } } - public void testSetHosts() throws IOException { - try { - restClient.setHosts((HttpHost[]) null); - fail("setHosts should have failed"); - } catch (IllegalArgumentException e) { - assertEquals("hosts must not be null nor empty", e.getMessage()); - } - try { - restClient.setHosts(); - fail("setHosts should have failed"); - } catch (IllegalArgumentException e) { - assertEquals("hosts must not be null nor empty", e.getMessage()); - } - try { - restClient.setHosts((HttpHost) null); - fail("setHosts should have failed"); - } catch (NullPointerException e) { - assertEquals("host cannot be null", e.getMessage()); - } - try { - restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201)); - fail("setHosts should have failed"); - } catch (NullPointerException e) { - assertEquals("host cannot be null", e.getMessage()); - } - } - /** * End to end test for ok status codes */ @@ -289,7 +251,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { } } - public void testIOExceptions() throws IOException { + public void testIOExceptions() { for (String method : getHttpMethods()) { //IOExceptions should be let bubble up try { diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index 33323d39663..ee6dbf449bd 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -28,6 +28,7 @@ import java.net.URI; import java.util.Collections; import java.util.concurrent.CountDownLatch; +import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -147,8 +148,48 @@ public class RestClientTests extends RestClientTestCase { } } + public void testSetHostsWrongArguments() throws IOException { + try (RestClient restClient = createRestClient()) { + restClient.setHosts((HttpHost[]) null); + fail("setHosts should have failed"); + } catch (IllegalArgumentException e) { + assertEquals("hosts must not be null nor empty", e.getMessage()); + } + try (RestClient restClient = createRestClient()) { + restClient.setHosts(); + fail("setHosts should have failed"); + } catch (IllegalArgumentException e) { + assertEquals("hosts must not be null nor empty", e.getMessage()); + } + try (RestClient restClient = createRestClient()) { + restClient.setHosts((HttpHost) null); + fail("setHosts should have failed"); + } catch (NullPointerException e) { + assertEquals("host cannot be null", e.getMessage()); + } + try (RestClient restClient = createRestClient()) { + restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201)); + fail("setHosts should have failed"); + } catch (NullPointerException e) { + assertEquals("host cannot be null", e.getMessage()); + } + } + + public void testNullPath() throws IOException { + try (RestClient restClient = createRestClient()) { + for (String method : getHttpMethods()) { + try { + restClient.performRequest(method, null); + fail("path set to null should fail!"); + } catch (NullPointerException e) { + assertEquals("path must not be null", e.getMessage()); + } + } + } + } + private static RestClient createRestClient() { HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200)}; - return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), new Header[]{}, hosts, null, null); + return new RestClient(mock(CloseableHttpAsyncClient.class), randomIntBetween(1_000, 30_000), new Header[]{}, hosts, null, null); } }