Add test for low-level client round-robin behaviour (#31616)
This commit is contained in:
parent
0b1a0641ef
commit
48cfb9b0db
|
@ -615,16 +615,16 @@ public class RestClient implements Closeable {
|
||||||
*/
|
*/
|
||||||
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
|
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
|
||||||
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
|
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
|
||||||
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
|
Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
|
||||||
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
|
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Select hosts to try. Package private for testing.
|
* Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
|
||||||
|
* if the previous attempt failed and so on. Package private for testing.
|
||||||
*/
|
*/
|
||||||
static List<Node> selectHosts(NodeTuple<List<Node>> nodeTuple,
|
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
|
||||||
Map<HttpHost, DeadHostState> blacklist, AtomicInteger lastNodeIndex,
|
AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
|
||||||
NodeSelector nodeSelector) throws IOException {
|
|
||||||
/*
|
/*
|
||||||
* Sort the nodes into living and dead lists.
|
* Sort the nodes into living and dead lists.
|
||||||
*/
|
*/
|
||||||
|
@ -653,8 +653,8 @@ public class RestClient implements Closeable {
|
||||||
nodeSelector.select(selectedLivingNodes);
|
nodeSelector.select(selectedLivingNodes);
|
||||||
if (false == selectedLivingNodes.isEmpty()) {
|
if (false == selectedLivingNodes.isEmpty()) {
|
||||||
/*
|
/*
|
||||||
* Rotate the list so subsequent requests will prefer the
|
* Rotate the list using a global counter as the distance so subsequent
|
||||||
* nodes in a different order.
|
* requests will try the nodes in a different order.
|
||||||
*/
|
*/
|
||||||
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
|
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
|
||||||
return selectedLivingNodes;
|
return selectedLivingNodes;
|
||||||
|
@ -662,15 +662,13 @@ public class RestClient implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Last resort: If there are no good nodes to use, either because
|
* Last resort: there are no good nodes to use, either because
|
||||||
* the selector rejected all the living nodes or because there aren't
|
* the selector rejected all the living nodes or because there aren't
|
||||||
* any living ones. Either way, we want to revive a single dead node
|
* any living ones. Either way, we want to revive a single dead node
|
||||||
* that the NodeSelectors are OK with. We do this by sorting the dead
|
* that the NodeSelectors are OK with. We do this by passing the dead
|
||||||
* nodes by their revival time and passing them through the
|
* nodes through the NodeSelector so it can have its say in which nodes
|
||||||
* NodeSelector so it can have its say in which nodes are ok and their
|
* are ok. If the selector is ok with any of the nodes then we will take
|
||||||
* ordering. If the selector is ok with any of the nodes then use just
|
* the one in the list that has the lowest revival time and try it.
|
||||||
* the first one in the list because we only want to revive a single
|
|
||||||
* node.
|
|
||||||
*/
|
*/
|
||||||
if (false == deadNodes.isEmpty()) {
|
if (false == deadNodes.isEmpty()) {
|
||||||
final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
|
final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
|
||||||
|
|
|
@ -21,6 +21,9 @@ package org.elasticsearch.client;
|
||||||
|
|
||||||
import org.apache.http.Header;
|
import org.apache.http.Header;
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
|
import org.apache.http.client.AuthCache;
|
||||||
|
import org.apache.http.impl.auth.BasicScheme;
|
||||||
|
import org.apache.http.impl.client.BasicAuthCache;
|
||||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||||
import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier;
|
import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier;
|
||||||
import org.elasticsearch.client.RestClient.NodeTuple;
|
import org.elasticsearch.client.RestClient.NodeTuple;
|
||||||
|
@ -35,13 +38,14 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -407,8 +411,8 @@ public class RestClientTests extends RestClientTestCase {
|
||||||
* blacklist time. It'll revive the node that is closest
|
* blacklist time. It'll revive the node that is closest
|
||||||
* to being revived that the NodeSelector is ok with.
|
* to being revived that the NodeSelector is ok with.
|
||||||
*/
|
*/
|
||||||
assertEquals(singletonList(n1), RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(), NodeSelector.ANY));
|
assertEquals(singletonList(n1), RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(), NodeSelector.ANY));
|
||||||
assertEquals(singletonList(n2), RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(), not1));
|
assertEquals(singletonList(n2), RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(), not1));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try a NodeSelector that excludes all nodes. This should
|
* Try a NodeSelector that excludes all nodes. This should
|
||||||
|
@ -449,23 +453,23 @@ public class RestClientTests extends RestClientTestCase {
|
||||||
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) throws IOException {
|
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) throws IOException {
|
||||||
int iterations = 1000;
|
int iterations = 1000;
|
||||||
AtomicInteger lastNodeIndex = new AtomicInteger(0);
|
AtomicInteger lastNodeIndex = new AtomicInteger(0);
|
||||||
assertEquals(expectedNodes, RestClient.selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
|
assertEquals(expectedNodes, RestClient.selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
|
||||||
// Calling it again rotates the set of results
|
// Calling it again rotates the set of results
|
||||||
for (int i = 1; i < iterations; i++) {
|
for (int i = 1; i < iterations; i++) {
|
||||||
Collections.rotate(expectedNodes, 1);
|
Collections.rotate(expectedNodes, 1);
|
||||||
assertEquals("iteration " + i, expectedNodes,
|
assertEquals("iteration " + i, expectedNodes,
|
||||||
RestClient.selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
|
RestClient.selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that {@link RestClient#selectHosts} fails on the provided arguments.
|
* Assert that {@link RestClient#selectNodes} fails on the provided arguments.
|
||||||
* @return the message in the exception thrown by the failure
|
* @return the message in the exception thrown by the failure
|
||||||
*/
|
*/
|
||||||
private String assertSelectAllRejected( NodeTuple<List<Node>> nodeTuple,
|
private static String assertSelectAllRejected( NodeTuple<List<Node>> nodeTuple,
|
||||||
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) {
|
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) {
|
||||||
try {
|
try {
|
||||||
RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(0), nodeSelector);
|
RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(0), nodeSelector);
|
||||||
throw new AssertionError("expected selectHosts to fail");
|
throw new AssertionError("expected selectHosts to fail");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return e.getMessage();
|
return e.getMessage();
|
||||||
|
@ -478,5 +482,56 @@ public class RestClientTests extends RestClientTestCase {
|
||||||
new Header[] {}, nodes, null, null, null);
|
new Header[] {}, nodes, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRoundRobin() throws IOException {
|
||||||
|
int numNodes = randomIntBetween(2, 10);
|
||||||
|
AuthCache authCache = new BasicAuthCache();
|
||||||
|
List<Node> nodes = new ArrayList<>(numNodes);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
Node node = new Node(new HttpHost("localhost", 9200 + i));
|
||||||
|
nodes.add(node);
|
||||||
|
authCache.put(node.getHost(), new BasicScheme());
|
||||||
|
}
|
||||||
|
NodeTuple<List<Node>> nodeTuple = new NodeTuple<>(nodes, authCache);
|
||||||
|
|
||||||
|
//test the transition from negative to positive values
|
||||||
|
AtomicInteger lastNodeIndex = new AtomicInteger(-numNodes);
|
||||||
|
assertNodes(nodeTuple, lastNodeIndex, 50);
|
||||||
|
assertEquals(-numNodes + 50, lastNodeIndex.get());
|
||||||
|
|
||||||
|
//test the highest positive values up to MAX_VALUE
|
||||||
|
lastNodeIndex.set(Integer.MAX_VALUE - numNodes * 10);
|
||||||
|
assertNodes(nodeTuple, lastNodeIndex, numNodes * 10);
|
||||||
|
assertEquals(Integer.MAX_VALUE, lastNodeIndex.get());
|
||||||
|
|
||||||
|
//test the transition from MAX_VALUE to MIN_VALUE
|
||||||
|
//this is the only time where there is most likely going to be a jump from a node
|
||||||
|
//to another one that's not necessarily the next one.
|
||||||
|
assertEquals(Integer.MIN_VALUE, lastNodeIndex.incrementAndGet());
|
||||||
|
assertNodes(nodeTuple, lastNodeIndex, 50);
|
||||||
|
assertEquals(Integer.MIN_VALUE + 50, lastNodeIndex.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertNodes(NodeTuple<List<Node>> nodeTuple, AtomicInteger lastNodeIndex, int runs) throws IOException {
|
||||||
|
int distance = lastNodeIndex.get() % nodeTuple.nodes.size();
|
||||||
|
/*
|
||||||
|
* Collections.rotate is not super intuitive: distance 1 means that the last element will become the first and so on,
|
||||||
|
* while distance -1 means that the second element will become the first and so on.
|
||||||
|
*/
|
||||||
|
int expectedOffset = distance > 0 ? nodeTuple.nodes.size() - distance : Math.abs(distance);
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
Iterable<Node> selectedNodes = RestClient.selectNodes(nodeTuple, Collections.<HttpHost, DeadHostState>emptyMap(),
|
||||||
|
lastNodeIndex, NodeSelector.ANY);
|
||||||
|
List<Node> expectedNodes = nodeTuple.nodes;
|
||||||
|
int index = 0;
|
||||||
|
for (Node actualNode : selectedNodes) {
|
||||||
|
Node expectedNode = expectedNodes.get((index + expectedOffset) % expectedNodes.size());
|
||||||
|
assertSame(expectedNode, actualNode);
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
expectedOffset--;
|
||||||
|
if (expectedOffset < 0) {
|
||||||
|
expectedOffset += nodeTuple.nodes.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue