Node selector per client rather than per request (#31471)

We have made node selectors configurable per request, but all 
of other language clients don't allow for that.
A good reason not to do so, is that having a different node selector 
per request breaks round-robin. This commit makes NodeSelector 
configurable only at client initialization. It also improves the docs 
on this matter, important given that a single node selector can still 
affect round-robin.
This commit is contained in:
Luca Cavanna 2018-06-22 17:15:29 +02:00 committed by GitHub
parent 59e7c6411a
commit 16e4e7a7cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 208 additions and 148 deletions

View File

@ -24,7 +24,7 @@ import java.util.Iterator;
/**
* Selects nodes that can receive requests. Used to keep requests away
* from master nodes or to send them to nodes with a particular attribute.
* Use with {@link RequestOptions.Builder#setNodeSelector(NodeSelector)}.
* Use with {@link RestClientBuilder#setNodeSelector(NodeSelector)}.
*/
public interface NodeSelector {
/**
@ -68,7 +68,7 @@ public interface NodeSelector {
* have the {@code master} role OR it has the data {@code data}
* role.
*/
NodeSelector NOT_MASTER_ONLY = new NodeSelector() {
NodeSelector SKIP_DEDICATED_MASTERS = new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
@ -84,7 +84,7 @@ public interface NodeSelector {
@Override
public String toString() {
return "NOT_MASTER_ONLY";
return "SKIP_DEDICATED_MASTERS";
}
};
}

View File

@ -37,22 +37,18 @@ import java.util.ArrayList;
*/
public final class RequestOptions {
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), NodeSelector.ANY,
HeapBufferedResponseConsumerFactory.DEFAULT).build();
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();
private final List<Header> headers;
private final NodeSelector nodeSelector;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private RequestOptions(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.nodeSelector = builder.nodeSelector;
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
}
public Builder toBuilder() {
Builder builder = new Builder(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return builder;
return new Builder(headers, httpAsyncResponseConsumerFactory);
}
/**
@ -62,14 +58,6 @@ public final class RequestOptions {
return headers;
}
/**
* The selector that chooses which nodes are valid destinations for
* {@link Request}s with these options.
*/
public NodeSelector getNodeSelector() {
return nodeSelector;
}
/**
* The {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
@ -93,9 +81,6 @@ public final class RequestOptions {
b.append(headers.get(h).toString());
}
}
if (nodeSelector != NodeSelector.ANY) {
b.append(", nodeSelector=").append(nodeSelector);
}
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
}
@ -113,24 +98,20 @@ public final class RequestOptions {
RequestOptions other = (RequestOptions) obj;
return headers.equals(other.headers)
&& nodeSelector.equals(other.nodeSelector)
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
}
@Override
public int hashCode() {
return Objects.hash(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
}
public static class Builder {
private final List<Header> headers;
private NodeSelector nodeSelector;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private Builder(List<Header> headers, NodeSelector nodeSelector,
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
this.headers = new ArrayList<>(headers);
this.nodeSelector = nodeSelector;
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
}
@ -150,14 +131,6 @@ public final class RequestOptions {
this.headers.add(new ReqHeader(name, value));
}
/**
* Configure the selector that chooses which nodes are valid
* destinations for {@link Request}s with these options
*/
public void setNodeSelector(NodeSelector nodeSelector) {
this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector cannot be null");
}
/**
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the

View File

@ -48,6 +48,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;
import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
@ -74,7 +75,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;
import static java.util.Collections.singletonList;
@ -108,15 +108,17 @@ public class RestClient implements Closeable {
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private final FailureListener failureListener;
private final NodeSelector nodeSelector;
private volatile NodeTuple<List<Node>> nodeTuple;
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
List<Node> nodes, String pathPrefix, FailureListener failureListener) {
List<Node> nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
setNodes(nodes);
}
@ -146,7 +148,7 @@ public class RestClient implements Closeable {
/**
* Replaces the hosts with which the client communicates.
*
* @deprecated prefer {@link setNodes} because it allows you
* @deprecated prefer {@link #setNodes(Collection)} because it allows you
* to set metadata for use with {@link NodeSelector}s
*/
@Deprecated
@ -180,8 +182,8 @@ public class RestClient implements Closeable {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (int i = 0; i < hosts.length; i++) {
nodes.add(new Node(hosts[i]));
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}
@ -509,7 +511,7 @@ public class RestClient implements Closeable {
setHeaders(httpRequest, request.getOptions().getHeaders());
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextNode(request.getOptions().getNodeSelector()), httpRequest, ignoreErrorCodes,
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
}
@ -611,7 +613,7 @@ public class RestClient implements Closeable {
* that is closest to being revived.
* @throws IOException if no nodes are available
*/
private NodeTuple<Iterator<Node>> nextNode(NodeSelector nodeSelector) throws IOException {
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);

View File

@ -55,6 +55,7 @@ public final class RestClientBuilder {
private HttpClientConfigCallback httpClientConfigCallback;
private RequestConfigCallback requestConfigCallback;
private String pathPrefix;
private NodeSelector nodeSelector = NodeSelector.ANY;
/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
@ -173,6 +174,16 @@ public final class RestClientBuilder {
return this;
}
/**
* Sets the {@link NodeSelector} to be used for all requests.
* @throws NullPointerException if the provided nodeSelector is null
*/
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector) {
Objects.requireNonNull(nodeSelector, "nodeSelector must not be null");
this.nodeSelector = nodeSelector;
return this;
}
/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
@ -186,7 +197,8 @@ public final class RestClientBuilder {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, pathPrefix, failureListener);
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector);
httpClient.start();
return restClient;
}

View File

@ -59,7 +59,7 @@ public class NodeSelectorTests extends RestClientTestCase {
Collections.shuffle(nodes, getRandom());
List<Node> expected = new ArrayList<>(nodes);
expected.remove(masterOnly);
NodeSelector.NOT_MASTER_ONLY.select(nodes);
NodeSelector.SKIP_DEDICATED_MASTERS.select(nodes);
assertEquals(expected, nodes);
}

View File

@ -114,10 +114,6 @@ public class RequestOptionsTests extends RestClientTestCase {
}
}
if (randomBoolean()) {
builder.setNodeSelector(mock(NodeSelector.class));
}
if (randomBoolean()) {
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
}
@ -131,15 +127,12 @@ public class RequestOptionsTests extends RestClientTestCase {
private static RequestOptions mutate(RequestOptions options) {
RequestOptions.Builder mutant = options.toBuilder();
int mutationType = between(0, 2);
int mutationType = between(0, 1);
switch (mutationType) {
case 0:
mutant.addHeader("extra", "m");
return mutant.build();
case 1:
mutant.setNodeSelector(mock(NodeSelector.class));
return mutant.build();
case 2:
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
return mutant.build();
default:

View File

@ -75,14 +75,15 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
httpServers[i] = httpServer;
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
}
restClient = buildRestClient();
restClient = buildRestClient(NodeSelector.ANY);
}
private static RestClient buildRestClient() {
private static RestClient buildRestClient(NodeSelector nodeSelector) {
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClientBuilder.setNodeSelector(nodeSelector);
return restClientBuilder.build();
}
@ -199,29 +200,28 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
* test what happens after calling
*/
public void testNodeSelector() throws IOException {
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(firstPositionNodeSelector());
request.setOptions(options);
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
if (false == System.getProperty("os.name").startsWith("Windows")) {
assertEquals("Connection refused", e.getMessage());
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
Request request = new Request("GET", "/200");
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
if (false == System.getProperty("os.name").startsWith("Windows")) {
assertEquals("Connection refused", e.getMessage());
}
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
}
}

View File

@ -35,9 +35,7 @@ import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.Node.Roles;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -74,13 +72,11 @@ import static org.mockito.Mockito.when;
public class RestClientMultipleHostsTests extends RestClientTestCase {
private ExecutorService exec = Executors.newFixedThreadPool(1);
private RestClient restClient;
private List<Node> nodes;
private HostsTrackingFailureListener failureListener;
@Before
@SuppressWarnings("unchecked")
public void createRestClient() throws IOException {
public RestClient createRestClient(NodeSelector nodeSelector) {
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
@ -119,7 +115,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
restClient = new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener);
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector);
}
/**
@ -131,12 +127,13 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinOkStatusCodes() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Set<HttpHost> hostsSet = hostsSet();
for (int j = 0; j < nodes.size(); j++) {
int statusCode = randomOkStatusCode(getRandom());
Response response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
assertEquals(statusCode, response.getStatusLine().getStatusCode());
assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost()));
}
@ -146,6 +143,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinNoRetryErrors() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Set<HttpHost> hostsSet = hostsSet();
@ -153,7 +151,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
String method = randomHttpMethod(getRandom());
int statusCode = randomErrorNoRetryStatusCode(getRandom());
try {
Response response = restClient.performRequest(method, "/" + statusCode);
Response response = restClient.performRequest(new Request(method, "/" + statusCode));
if (method.equals("HEAD") && statusCode == 404) {
//no exception gets thrown although we got a 404
assertEquals(404, response.getStatusLine().getStatusCode());
@ -178,9 +176,10 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinRetryErrors() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
String retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
/*
@ -237,7 +236,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
for (int j = 0; j < nodes.size(); j++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
@ -269,7 +268,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
int statusCode = randomErrorNoRetryStatusCode(getRandom());
Response response;
try {
response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
} catch (ResponseException e) {
response = e.getResponse();
}
@ -286,7 +285,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
for (int y = 0; y < i + 1; y++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
@ -323,6 +322,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
assertTrue(found);
}
};
RestClient restClient = createRestClient(firstPositionOnly);
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
@ -330,18 +330,16 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(firstPositionOnly);
request.setOptions(options);
Response response = restClient.performRequest(request);
assertEquals(nodes.get(0).getHost(), response.getHost());
}
}
public void testSetNodes() throws IOException {
RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS);
List<Node> newNodes = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
Roles roles = i == 0 ? new Roles(false, true, true) : new Roles(true, false, false);
Node.Roles roles = i == 0 ? new Node.Roles(false, true, true) : new Node.Roles(true, false, false);
newNodes.add(new Node(nodes.get(i).getHost(), null, null, null, roles, null));
}
restClient.setNodes(newNodes);
@ -352,9 +350,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(NodeSelector.NOT_MASTER_ONLY);
request.setOptions(options);
Response response = restClient.performRequest(request);
assertEquals(newNodes.get(0).getHost(), response.getHost());
}

View File

@ -150,7 +150,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
node = new Node(new HttpHost("localhost", 9200));
failureListener = new HostsTrackingFailureListener();
restClient = new RestClient(httpClient, 10000, defaultHeaders,
singletonList(node), null, failureListener);
singletonList(node), null, failureListener, NodeSelector.ANY);
}
/**

View File

@ -54,7 +54,7 @@ public class RestClientTests extends RestClientTestCase {
public void testCloseIsIdempotent() throws IOException {
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
@ -475,7 +475,7 @@ public class RestClientTests extends RestClientTestCase {
private static RestClient createRestClient() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000),
new Header[] {}, nodes, null, null);
new Header[] {}, nodes, null, null, null);
}

View File

@ -36,7 +36,6 @@ import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.HasAttributeNodeSelector;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
@ -54,6 +53,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
/**
@ -82,8 +82,7 @@ public class RestClientDocumentation {
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer " + TOKEN); // <1>
builder.setNodeSelector(NodeSelector.NOT_MASTER_ONLY); // <2>
builder.setHttpAsyncResponseConsumerFactory( // <3>
builder.setHttpAsyncResponseConsumerFactory( // <2>
new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
@ -115,6 +114,45 @@ public class RestClientDocumentation {
builder.setMaxRetryTimeoutMillis(10000); // <1>
//end::rest-client-init-max-retry-timeout
}
{
//tag::rest-client-init-node-selector
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS); // <1>
//end::rest-client-init-node-selector
}
{
//tag::rest-client-init-allocation-aware-selector
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
builder.setNodeSelector(new NodeSelector() { // <1>
@Override
public void select(Iterable<Node> nodes) {
/*
* Prefer any node that belongs to rack_one. If none is around
* we will go to another rack till it's time to try and revive
* some of the nodes that belong to rack_one.
*/
boolean foundOne = false;
for (Node node : nodes) {
String rackId = node.getAttributes().get("rack_id").get(0);
if ("rack_one".equals(rackId)) {
foundOne = true;
break;
}
}
if (foundOne) {
Iterator<Node> nodesIt = nodes.iterator();
while (nodesIt.hasNext()) {
Node node = nodesIt.next();
String rackId = node.getAttributes().get("rack_id").get(0);
if ("rack_one".equals(rackId) == false) {
nodesIt.remove();
}
}
}
}
});
//end::rest-client-init-allocation-aware-selector
}
{
//tag::rest-client-init-failure-listener
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
@ -198,13 +236,6 @@ public class RestClientDocumentation {
request.setOptions(options);
//end::rest-client-options-customize-header
}
{
//tag::rest-client-options-customize-attribute
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
options.setNodeSelector(new HasAttributeNodeSelector("rack", "c12")); // <1>
request.setOptions(options);
//end::rest-client-options-customize-attribute
}
}
{
HttpEntity[] documents = new HttpEntity[10];

View File

@ -99,3 +99,30 @@ http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`netwo
to your
http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java
security policy].
=== Node selector
The client sends each request to one of the configured nodes in round-robin
fashion. Nodes can optionally be filtered through a node selector that needs
to be provided when initializing the client. This is useful when sniffing is
enabled, in case only dedicated master nodes should be hit by HTTP requests.
For each request the client will run the eventually configured node selector
to filter the node candidates, then select the next one in the list out of the
remaining ones.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-allocation-aware-selector]
--------------------------------------------------
<1> Set an allocation aware node selector that allows to pick a node in the
local rack if any available, otherwise go to any other node in any rack. It
acts as a preference rather than a strict requirement, given that it goes to
another rack if none of the local nodes are available, rather than returning
no nodes in such case which would make the client forcibly revive a local node
whenever none of the nodes from the preferred rack is available.
WARNING: Node selectors that do not consistently select the same set of nodes
will make round-robin behaviour unpredictable and possibly unfair. The
preference example above is fine as it reasons about availability of nodes
which already affects the predictability of round-robin. Node selection should
not depend on other external factors or round-robin will not work properly.

View File

@ -196,6 +196,16 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failur
<1> Set a listener that gets notified every time a node fails, in case actions
need to be taken. Used internally when sniffing on failure is enabled.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-node-selector]
--------------------------------------------------
<1> Set the node selector to be used to filter the nodes the client will send
requests to among the ones that are set to the client itself. This is useful
for instance to prevent sending requests to dedicated master nodes when
sniffing is enabled. By default the client sends requests to every configured
node.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-request-config-callback]
@ -283,8 +293,7 @@ instance and share it between all requests:
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-singleton]
--------------------------------------------------
<1> Add any headers needed by all requests.
<2> Set a `NodeSelector`.
<3> Customize the response consumer.
<2> Customize the response consumer.
`addHeader` is for headers that are required for authorization or to work with
a proxy in front of Elasticsearch. There is no need to set the `Content-Type`
@ -315,15 +324,6 @@ adds an extra header:
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-header]
--------------------------------------------------
Or you can send requests to nodes with a particular attribute:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-attribute]
--------------------------------------------------
<1> Replace the node selector with one that selects nodes on a particular rack.
==== Multiple parallel asynchronous actions
The client is quite happy to execute many actions in parallel. The following

View File

@ -91,8 +91,9 @@ public class DocsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion);
final Version masterVersion) {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion,
restClientBuilder -> configureClient(restClientBuilder, restClientSettings()));
}
/**

View File

@ -30,7 +30,6 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -47,6 +46,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
@ -381,6 +381,11 @@ public abstract class ESRestTestCase extends ESTestCase {
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
configureClient(builder, settings);
return builder.build();
}
protected static void configureClient(RestClientBuilder builder, Settings settings) throws IOException {
String keystorePath = settings.get(TRUSTSTORE_PATH);
if (keystorePath != null) {
final String keystorePass = settings.get(TRUSTSTORE_PASSWORD);
@ -399,11 +404,10 @@ public abstract class ESRestTestCase extends ESTestCase {
SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(keyStore, null).build();
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslcontext);
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLStrategy(sessionStrategy));
} catch (KeyStoreException|NoSuchAlgorithmException|KeyManagementException|CertificateException e) {
} catch (KeyStoreException |NoSuchAlgorithmException |KeyManagementException |CertificateException e) {
throw new RuntimeException("Error setting up ssl", e);
}
}
try (ThreadContext threadContext = new ThreadContext(settings)) {
Header[] defaultHeaders = new Header[threadContext.getHeaders().size()];
int i = 0;
@ -412,7 +416,6 @@ public abstract class ESRestTestCase extends ESTestCase {
}
builder.setDefaultHeaders(defaultHeaders);
}
final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT);
if (requestTimeoutString != null) {
final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT);
@ -423,7 +426,6 @@ public abstract class ESRestTestCase extends ESTestCase {
final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
builder.setRequestConfigCallback(conf -> conf.setSocketTimeout(Math.toIntExact(socketTimeout.getMillis())));
}
return builder.build();
}
@SuppressWarnings("unchecked")

View File

@ -27,6 +27,8 @@ import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
import java.io.IOException;
@ -47,8 +49,9 @@ public final class ClientYamlDocsTestClient extends ClientYamlTestClient {
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
super(restSpec, restClient, hosts, esVersion, masterVersion);
final Version masterVersion,
final CheckedConsumer<RestClientBuilder, IOException> clientBuilderConsumer) {
super(restSpec, restClient, hosts, esVersion, masterVersion, clientBuilderConsumer);
}
@Override
@ -66,9 +69,9 @@ public final class ClientYamlDocsTestClient extends ClientYamlTestClient {
request.addParameter(param.getKey(), param.getValue());
}
request.setEntity(entity);
setOptions(request, headers, nodeSelector);
setOptions(request, headers);
try {
Response response = restClient.performRequest(request);
Response response = getRestClient(nodeSelector).performRequest(request);
return new ClientYamlTestResponse(response);
} catch (ResponseException e) {
throw new ClientYamlTestResponseException(e);

View File

@ -26,18 +26,22 @@ import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestPath;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@ -58,21 +62,24 @@ public class ClientYamlTestClient {
private static final ContentType YAML_CONTENT_TYPE = ContentType.create("application/yaml");
private final ClientYamlSuiteRestSpec restSpec;
protected final RestClient restClient;
protected final Map<NodeSelector, RestClient> restClients = new HashMap<>();
private final Version esVersion;
private final Version masterVersion;
private final CheckedConsumer<RestClientBuilder, IOException> clientBuilderConsumer;
public ClientYamlTestClient(
final ClientYamlSuiteRestSpec restSpec,
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
final Version masterVersion,
final CheckedConsumer<RestClientBuilder, IOException> clientBuilderConsumer) {
assert hosts.size() > 0;
this.restSpec = restSpec;
this.restClient = restClient;
this.restClients.put(NodeSelector.ANY, restClient);
this.esVersion = esVersion;
this.masterVersion = masterVersion;
this.clientBuilderConsumer = clientBuilderConsumer;
}
public Version getEsVersion() {
@ -172,30 +179,43 @@ public class ClientYamlTestClient {
requestPath = finalPath.toString();
}
logger.debug("calling api [{}]", apiName);
Request request = new Request(requestMethod, requestPath);
for (Map.Entry<String, String> param : queryStringParams.entrySet()) {
request.addParameter(param.getKey(), param.getValue());
}
request.setEntity(entity);
setOptions(request, headers, nodeSelector);
setOptions(request, headers);
try {
Response response = restClient.performRequest(request);
Response response = getRestClient(nodeSelector).performRequest(request);
return new ClientYamlTestResponse(response);
} catch(ResponseException e) {
throw new ClientYamlTestResponseException(e);
}
}
protected static void setOptions(Request request, Map<String, String> headers, NodeSelector nodeSelector) {
protected RestClient getRestClient(NodeSelector nodeSelector) {
//lazily build a new client in case we need to point to some specific node
return restClients.computeIfAbsent(nodeSelector, selector -> {
RestClient anyClient = restClients.get(NodeSelector.ANY);
RestClientBuilder builder = RestClient.builder(anyClient.getNodes().toArray(new Node[0]));
try {
clientBuilderConsumer.accept(builder);
} catch(IOException e) {
throw new UncheckedIOException(e);
}
builder.setNodeSelector(nodeSelector);
return builder.build();
});
}
protected static void setOptions(Request request, Map<String, String> headers) {
RequestOptions.Builder options = request.getOptions().toBuilder();
for (Map.Entry<String, String> header : headers.entrySet()) {
logger.debug("Adding header {} with value {}", header.getKey(), header.getValue());
options.addHeader(header.getKey(), header.getValue());
}
options.setNodeSelector(nodeSelector);
request.setOptions(options);
}

View File

@ -47,6 +47,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -122,7 +123,7 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
public void initAndResetContext() throws Exception {
if (restTestExecutionContext == null) {
// Sniff host metadata in case we need it in the yaml tests
List<Node> nodesWithMetadata = sniffHostMetadata(adminClient());
List<Node> nodesWithMetadata = sniffHostMetadata();
client().setNodes(nodesWithMetadata);
adminClient().setNodes(nodesWithMetadata);
@ -163,8 +164,9 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
return new ClientYamlTestClient(restSpec, restClient, hosts, esVersion, masterVersion);
final Version masterVersion) {
return new ClientYamlTestClient(restSpec, restClient, hosts, esVersion, masterVersion,
restClientBuilder -> configureClient(restClientBuilder, restClientSettings()));
}
/**
@ -195,8 +197,7 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
}
//sort the candidates so they will always be in the same order before being shuffled, for repeatability
Collections.sort(tests,
(o1, o2) -> ((ClientYamlTestCandidate)o1[0]).getTestPath().compareTo(((ClientYamlTestCandidate)o2[0]).getTestPath()));
tests.sort(Comparator.comparing(o -> ((ClientYamlTestCandidate) o[0]).getTestPath()));
return tests;
}
@ -401,7 +402,7 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
/**
* Sniff the cluster for host metadata.
*/
private List<Node> sniffHostMetadata(RestClient client) throws IOException {
private List<Node> sniffHostMetadata() throws IOException {
ElasticsearchNodesSniffer.Scheme scheme =
ElasticsearchNodesSniffer.Scheme.valueOf(getProtocol().toUpperCase(Locale.ROOT));
ElasticsearchNodesSniffer sniffer = new ElasticsearchNodesSniffer(

View File

@ -73,7 +73,7 @@ public class ClientYamlTestSectionTests extends AbstractClientYamlTestFragmentPa
section.setSkipSection(new SkipSection(null, singletonList("node_selector"), null));
DoSection doSection = new DoSection(new XContentLocation(lineNumber, 0));
ApiCallSection apiCall = new ApiCallSection("test");
apiCall.setNodeSelector(NodeSelector.NOT_MASTER_ONLY);
apiCall.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
doSection.setApiCallSection(apiCall);
section.addExecutableSection(doSection);
}
@ -84,7 +84,7 @@ public class ClientYamlTestSectionTests extends AbstractClientYamlTestFragmentPa
section.setSkipSection(new SkipSection(null, singletonList("yaml"), null));
DoSection doSection = new DoSection(new XContentLocation(lineNumber, 0));
ApiCallSection apiCall = new ApiCallSection("test");
apiCall.setNodeSelector(NodeSelector.NOT_MASTER_ONLY);
apiCall.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
doSection.setApiCallSection(apiCall);
Exception e = expectThrows(IllegalArgumentException.class, () -> section.addExecutableSection(doSection));
assertEquals("Attempted to add a [do] with a [node_selector] section without a corresponding"

View File

@ -20,7 +20,6 @@ import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
import org.elasticsearch.xpack.test.rest.XPackRestIT;
import org.junit.After;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -58,8 +57,9 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT {
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion);
final Version masterVersion) {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion,
restClientBuilder -> configureClient(restClientBuilder, restClientSettings()));
}
/**