Adapt low-level REST client to java 8 (#41537)

As a follow-up to #38540 we can use lambda functions and method
references where convenient in the low-level REST client.

Also, we need to update the docs to state that the minimum java version
required is 1.8.
This commit is contained in:
Luca Cavanna 2019-05-03 11:15:24 +02:00
parent 3e231bbad6
commit e747326b04
8 changed files with 51 additions and 127 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
@ -30,10 +31,11 @@ final class DeadHostState implements Comparable<DeadHostState> {
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
static final Supplier<Long> DEFAULT_TIME_SUPPLIER = System::nanoTime;
private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;
private final Supplier<Long> timeSupplier;
/**
* Build the initial dead state of a host. Useful when a working host stops functioning
@ -41,9 +43,9 @@ final class DeadHostState implements Comparable<DeadHostState> {
*
* @param timeSupplier a way to supply the current time and allow for unit testing
*/
DeadHostState(TimeSupplier timeSupplier) {
DeadHostState(Supplier<Long> timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}
@ -51,14 +53,14 @@ final class DeadHostState implements Comparable<DeadHostState> {
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
* through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
*
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
*/
DeadHostState(DeadHostState previousDeadHostState) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = previousDeadHostState.timeSupplier;
}
@ -69,7 +71,7 @@ final class DeadHostState implements Comparable<DeadHostState> {
* @return true if the host should be retried, false otherwise
*/
boolean shallBeRetried() {
return timeSupplier.nanoTime() - deadUntilNanos > 0;
return timeSupplier.get() - deadUntilNanos > 0;
}
/**
@ -87,8 +89,8 @@ final class DeadHostState implements Comparable<DeadHostState> {
@Override
public int compareTo(DeadHostState other) {
if (timeSupplier != other.timeSupplier) {
throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
+ timeSupplier + " != " + other.timeSupplier + "]");
throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks");
}
return Long.compare(deadUntilNanos, other.deadUntilNanos);
}
@ -101,23 +103,4 @@ final class DeadHostState implements Comparable<DeadHostState> {
", timeSupplier=" + timeSupplier +
'}';
}
/**
* Time supplier that makes timing aspects pluggable to ease testing
*/
interface TimeSupplier {
TimeSupplier DEFAULT = new TimeSupplier() {
@Override
public long nanoTime() {
return System.nanoTime();
}
@Override
public String toString() {
return "nanoTime";
}
};
long nanoTime();
}
}

View File

@ -74,7 +74,6 @@ public final class Request {
*/
public void addParameter(String name, String value) {
Objects.requireNonNull(name, "url parameter name cannot be null");
// .putIfAbsent(name, value) except we are in Java 7 which doesn't have that.
if (parameters.containsKey(name)) {
throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]");
} else {

View File

@ -19,8 +19,8 @@
package org.elasticsearch.client;
import org.apache.http.message.BasicHeader;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
@ -38,7 +38,7 @@ public final class RequestOptions {
* Default request options.
*/
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;

View File

@ -46,7 +46,6 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;
import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
@ -72,6 +71,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
@ -139,7 +139,11 @@ public class RestClient implements Closeable {
* @see Node#Node(HttpHost)
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hostsToNodes(hosts));
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
return new RestClientBuilder(nodes);
}
/**
@ -163,17 +167,6 @@ public class RestClient implements Closeable {
this.blacklist.clear();
}
private static List<Node> hostsToNodes(HttpHost[] hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}
/**
* Get the list of nodes that the client knows about. The list is
* unmodifiable.
@ -369,15 +362,11 @@ public class RestClient implements Closeable {
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null) {
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
} else {
deadNodes.add(new DeadNode(node, deadness));
}
if (deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
}
deadNodes.add(new DeadNode(node, deadness));
}
if (false == livingNodes.isEmpty()) {
@ -415,12 +404,7 @@ public class RestClient implements Closeable {
* to compare many things. This saves us a sort on the unfiltered
* list.
*/
nodeSelector.select(new Iterable<Node>() {
@Override
public Iterator<Node> iterator() {
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
}
});
nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
if (false == selectedDeadNodes.isEmpty()) {
return singletonList(Collections.min(selectedDeadNodes).node);
}
@ -447,7 +431,7 @@ public class RestClient implements Closeable {
private void onFailure(Node node) {
while(true) {
DeadHostState previousDeadHostState =
blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
if (previousDeadHostState == null) {
if (logger.isDebugEnabled()) {
logger.debug("added [" + node + "] to blacklist");

View File

@ -186,12 +186,8 @@ public final class RestClientBuilder {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
@ -218,12 +214,7 @@ public final class RestClientBuilder {
}
final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return finalBuilder.build();
}
});
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("could not create the default ssl context", e);
}

View File

@ -22,8 +22,6 @@ package org.elasticsearch.client;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.elasticsearch.client.DeadHostState.TimeSupplier;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase {
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};
public void testInitialDeadHostStateDefaultTimeSupplier() {
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
long currentTime = System.nanoTime();
assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime));
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}
public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous);
@ -58,10 +56,13 @@ public class DeadHostStateTests extends RestClientTestCase {
public void testCompareToTimeSupplier() {
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
final AtomicLong time = new AtomicLong(0);
for (int i = 0; i < numObjects; i++) {
if (i == 0) {
// this test requires a strictly increasing timer
deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier());
// this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight
// loop we always notice time moving forward. This does not happen for real timer implementations
// (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
deadHostStates[i] = new DeadHostState(time::incrementAndGet);
} else {
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]);
}
@ -74,42 +75,39 @@ public class DeadHostStateTests extends RestClientTestCase {
public void testCompareToDifferingTimeSupplier() {
try {
new DeadHostState(TimeSupplier.DEFAULT).compareTo(
new DeadHostState(new ConfigurableTimeSupplier()));
new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo(
new DeadHostState(() -> 0L));
fail("expected failure");
} catch (IllegalArgumentException e) {
assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]",
e.getMessage());
assertEquals("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks", e.getMessage());
}
}
public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
final AtomicLong time = new AtomicLong(0);
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
deadHostState = new DeadHostState(time::get);
} else {
deadHostState = new DeadHostState(deadHostState);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}
public void testDeadHostStateTimeouts() {
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
DeadHostState previous = new DeadHostState(() -> 0L);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous);
@ -123,37 +121,4 @@ public class DeadHostStateTests extends RestClientTestCase {
previous = deadHostState;
}
}
static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
long nanoTime;
@Override
public long nanoTime() {
return nanoTime;
}
@Override
public String toString() {
return "configured[" + nanoTime + "]";
}
}
/**
* Simulates a monotonically strict increasing time (i.e. the value increases on every call to <code>#nanoTime()</code>). This ensures
* that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real
* timer implementations (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
*/
static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier {
private final AtomicLong time = new AtomicLong(0);
@Override
public long nanoTime() {
return time.incrementAndGet();
}
@Override
public String toString() {
return "strict monotonic[" + time.get() + "]";
}
}
}

View File

@ -25,7 +25,6 @@ import org.apache.http.client.AuthCache;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier;
import org.elasticsearch.client.RestClient.NodeTuple;
import java.io.IOException;
@ -40,6 +39,8 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.instanceOf;
@ -266,14 +267,15 @@ public class RestClientTests extends RestClientTestCase {
// Mark all the nodes dead for a few test cases
{
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
final AtomicLong time = new AtomicLong(0L);
Supplier<Long> timeSupplier = time::get;
Map<HttpHost, DeadHostState> blacklist = new HashMap<>();
blacklist.put(n1.getHost(), new DeadHostState(timeSupplier));
blacklist.put(n2.getHost(), new DeadHostState(new DeadHostState(timeSupplier)));
blacklist.put(n3.getHost(), new DeadHostState(new DeadHostState(new DeadHostState(timeSupplier))));
/*
* case when fewer nodeTuple than blacklist, wont result in any IllegalCapacityException
* case when fewer nodeTuple than blacklist, won't result in any IllegalCapacityException
*/
{
NodeTuple<List<Node>> fewerNodeTuple = new NodeTuple<>(Arrays.asList(n1, n2), null);
@ -282,7 +284,7 @@ public class RestClientTests extends RestClientTestCase {
}
/*
* selectHosts will revive a single host if regardless of
* selectHosts will revive a single host regardless of
* blacklist time. It'll revive the node that is closest
* to being revived that the NodeSelector is ok with.
*/
@ -304,7 +306,7 @@ public class RestClientTests extends RestClientTestCase {
* Now lets wind the clock forward, past the timeout for one of
* the dead nodes. We should return it.
*/
timeSupplier.nanoTime = new DeadHostState(timeSupplier).getDeadUntilNanos();
time.set(new DeadHostState(timeSupplier).getDeadUntilNanos());
assertSelectLivingHosts(Arrays.asList(n1), nodeTuple, blacklist, NodeSelector.ANY);
/*
@ -318,7 +320,7 @@ public class RestClientTests extends RestClientTestCase {
* blacklist timeouts then we function as though the nodes aren't
* in the blacklist at all.
*/
timeSupplier.nanoTime += DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS;
time.addAndGet(DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS);
assertSelectLivingHosts(Arrays.asList(n1, n2, n3), nodeTuple, blacklist, NodeSelector.ANY);
assertSelectLivingHosts(Arrays.asList(n2, n3), nodeTuple, blacklist, not1);
}

View File

@ -14,7 +14,7 @@ The javadoc for the low level REST client can be found at {rest-client-javadoc}/
The low-level Java REST client is hosted on
http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.elasticsearch.client%22[Maven
Central]. The minimum Java version required is `1.7`.
Central]. The minimum Java version required is `1.8`.
The low-level REST client is subject to the same release cycle as
Elasticsearch. Replace the version with the desired client version, first