Merge branch 'master' into index-lifecycle
This commit is contained in:
commit
1d11fdaad8
|
@ -1,5 +1,5 @@
|
|||
elasticsearch = 7.0.0-alpha1
|
||||
lucene = 7.4.0-snapshot-518d303506
|
||||
lucene = 7.4.0
|
||||
|
||||
# optional dependencies
|
||||
spatial4j = 0.7
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.Iterator;
|
|||
/**
|
||||
* Selects nodes that can receive requests. Used to keep requests away
|
||||
* from master nodes or to send them to nodes with a particular attribute.
|
||||
* Use with {@link RequestOptions.Builder#setNodeSelector(NodeSelector)}.
|
||||
* Use with {@link RestClientBuilder#setNodeSelector(NodeSelector)}.
|
||||
*/
|
||||
public interface NodeSelector {
|
||||
/**
|
||||
|
@ -68,7 +68,7 @@ public interface NodeSelector {
|
|||
* have the {@code master} role OR it has the data {@code data}
|
||||
* role.
|
||||
*/
|
||||
NodeSelector NOT_MASTER_ONLY = new NodeSelector() {
|
||||
NodeSelector SKIP_DEDICATED_MASTERS = new NodeSelector() {
|
||||
@Override
|
||||
public void select(Iterable<Node> nodes) {
|
||||
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
|
||||
|
@ -84,7 +84,7 @@ public interface NodeSelector {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NOT_MASTER_ONLY";
|
||||
return "SKIP_DEDICATED_MASTERS";
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -37,22 +37,18 @@ import java.util.ArrayList;
|
|||
*/
|
||||
public final class RequestOptions {
|
||||
public static final RequestOptions DEFAULT = new Builder(
|
||||
Collections.<Header>emptyList(), NodeSelector.ANY,
|
||||
HeapBufferedResponseConsumerFactory.DEFAULT).build();
|
||||
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();
|
||||
|
||||
private final List<Header> headers;
|
||||
private final NodeSelector nodeSelector;
|
||||
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
|
||||
|
||||
private RequestOptions(Builder builder) {
|
||||
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
|
||||
this.nodeSelector = builder.nodeSelector;
|
||||
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
|
||||
}
|
||||
|
||||
public Builder toBuilder() {
|
||||
Builder builder = new Builder(headers, nodeSelector, httpAsyncResponseConsumerFactory);
|
||||
return builder;
|
||||
return new Builder(headers, httpAsyncResponseConsumerFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -62,14 +58,6 @@ public final class RequestOptions {
|
|||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* The selector that chooses which nodes are valid destinations for
|
||||
* {@link Request}s with these options.
|
||||
*/
|
||||
public NodeSelector getNodeSelector() {
|
||||
return nodeSelector;
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link HttpAsyncResponseConsumerFactory} used to create one
|
||||
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
|
||||
|
@ -93,9 +81,6 @@ public final class RequestOptions {
|
|||
b.append(headers.get(h).toString());
|
||||
}
|
||||
}
|
||||
if (nodeSelector != NodeSelector.ANY) {
|
||||
b.append(", nodeSelector=").append(nodeSelector);
|
||||
}
|
||||
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
|
||||
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
|
||||
}
|
||||
|
@ -113,24 +98,20 @@ public final class RequestOptions {
|
|||
|
||||
RequestOptions other = (RequestOptions) obj;
|
||||
return headers.equals(other.headers)
|
||||
&& nodeSelector.equals(other.nodeSelector)
|
||||
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(headers, nodeSelector, httpAsyncResponseConsumerFactory);
|
||||
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final List<Header> headers;
|
||||
private NodeSelector nodeSelector;
|
||||
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
|
||||
|
||||
private Builder(List<Header> headers, NodeSelector nodeSelector,
|
||||
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
|
||||
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
|
||||
this.headers = new ArrayList<>(headers);
|
||||
this.nodeSelector = nodeSelector;
|
||||
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
|
||||
}
|
||||
|
||||
|
@ -150,14 +131,6 @@ public final class RequestOptions {
|
|||
this.headers.add(new ReqHeader(name, value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the selector that chooses which nodes are valid
|
||||
* destinations for {@link Request}s with these options
|
||||
*/
|
||||
public void setNodeSelector(NodeSelector nodeSelector) {
|
||||
this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
|
||||
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
|
|||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
|
||||
import org.elasticsearch.client.DeadHostState.TimeSupplier;
|
||||
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
|
@ -74,7 +75,6 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
|
@ -108,15 +108,17 @@ public class RestClient implements Closeable {
|
|||
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
|
||||
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
|
||||
private final FailureListener failureListener;
|
||||
private final NodeSelector nodeSelector;
|
||||
private volatile NodeTuple<List<Node>> nodeTuple;
|
||||
|
||||
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
|
||||
List<Node> nodes, String pathPrefix, FailureListener failureListener) {
|
||||
List<Node> nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector) {
|
||||
this.client = client;
|
||||
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
|
||||
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
|
||||
this.failureListener = failureListener;
|
||||
this.pathPrefix = pathPrefix;
|
||||
this.nodeSelector = nodeSelector;
|
||||
setNodes(nodes);
|
||||
}
|
||||
|
||||
|
@ -146,7 +148,7 @@ public class RestClient implements Closeable {
|
|||
/**
|
||||
* Replaces the hosts with which the client communicates.
|
||||
*
|
||||
* @deprecated prefer {@link setNodes} because it allows you
|
||||
* @deprecated prefer {@link #setNodes(Collection)} because it allows you
|
||||
* to set metadata for use with {@link NodeSelector}s
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -180,8 +182,8 @@ public class RestClient implements Closeable {
|
|||
throw new IllegalArgumentException("hosts must not be null nor empty");
|
||||
}
|
||||
List<Node> nodes = new ArrayList<>(hosts.length);
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
nodes.add(new Node(hosts[i]));
|
||||
for (HttpHost host : hosts) {
|
||||
nodes.add(new Node(host));
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
@ -509,7 +511,7 @@ public class RestClient implements Closeable {
|
|||
setHeaders(httpRequest, request.getOptions().getHeaders());
|
||||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
|
||||
long startTime = System.nanoTime();
|
||||
performRequestAsync(startTime, nextNode(request.getOptions().getNodeSelector()), httpRequest, ignoreErrorCodes,
|
||||
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
|
||||
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
|
||||
}
|
||||
|
||||
|
@ -611,7 +613,7 @@ public class RestClient implements Closeable {
|
|||
* that is closest to being revived.
|
||||
* @throws IOException if no nodes are available
|
||||
*/
|
||||
private NodeTuple<Iterator<Node>> nextNode(NodeSelector nodeSelector) throws IOException {
|
||||
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
|
||||
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
|
||||
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
|
||||
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
|
||||
|
|
|
@ -55,6 +55,7 @@ public final class RestClientBuilder {
|
|||
private HttpClientConfigCallback httpClientConfigCallback;
|
||||
private RequestConfigCallback requestConfigCallback;
|
||||
private String pathPrefix;
|
||||
private NodeSelector nodeSelector = NodeSelector.ANY;
|
||||
|
||||
/**
|
||||
* Creates a new builder instance and sets the hosts that the client will send requests to.
|
||||
|
@ -173,6 +174,16 @@ public final class RestClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link NodeSelector} to be used for all requests.
|
||||
* @throws NullPointerException if the provided nodeSelector is null
|
||||
*/
|
||||
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector) {
|
||||
Objects.requireNonNull(nodeSelector, "nodeSelector must not be null");
|
||||
this.nodeSelector = nodeSelector;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link RestClient} based on the provided configuration.
|
||||
*/
|
||||
|
@ -186,7 +197,8 @@ public final class RestClientBuilder {
|
|||
return createHttpClient();
|
||||
}
|
||||
});
|
||||
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, pathPrefix, failureListener);
|
||||
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
|
||||
pathPrefix, failureListener, nodeSelector);
|
||||
httpClient.start();
|
||||
return restClient;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ public class NodeSelectorTests extends RestClientTestCase {
|
|||
Collections.shuffle(nodes, getRandom());
|
||||
List<Node> expected = new ArrayList<>(nodes);
|
||||
expected.remove(masterOnly);
|
||||
NodeSelector.NOT_MASTER_ONLY.select(nodes);
|
||||
NodeSelector.SKIP_DEDICATED_MASTERS.select(nodes);
|
||||
assertEquals(expected, nodes);
|
||||
}
|
||||
|
||||
|
|
|
@ -114,10 +114,6 @@ public class RequestOptionsTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
builder.setNodeSelector(mock(NodeSelector.class));
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
|
||||
}
|
||||
|
@ -131,15 +127,12 @@ public class RequestOptionsTests extends RestClientTestCase {
|
|||
|
||||
private static RequestOptions mutate(RequestOptions options) {
|
||||
RequestOptions.Builder mutant = options.toBuilder();
|
||||
int mutationType = between(0, 2);
|
||||
int mutationType = between(0, 1);
|
||||
switch (mutationType) {
|
||||
case 0:
|
||||
mutant.addHeader("extra", "m");
|
||||
return mutant.build();
|
||||
case 1:
|
||||
mutant.setNodeSelector(mock(NodeSelector.class));
|
||||
return mutant.build();
|
||||
case 2:
|
||||
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
|
||||
return mutant.build();
|
||||
default:
|
||||
|
|
|
@ -75,14 +75,15 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|||
httpServers[i] = httpServer;
|
||||
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
|
||||
}
|
||||
restClient = buildRestClient();
|
||||
restClient = buildRestClient(NodeSelector.ANY);
|
||||
}
|
||||
|
||||
private static RestClient buildRestClient() {
|
||||
private static RestClient buildRestClient(NodeSelector nodeSelector) {
|
||||
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
|
||||
if (pathPrefix.length() > 0) {
|
||||
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
|
||||
}
|
||||
restClientBuilder.setNodeSelector(nodeSelector);
|
||||
return restClientBuilder.build();
|
||||
}
|
||||
|
||||
|
@ -199,29 +200,28 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|||
* test what happens after calling
|
||||
*/
|
||||
public void testNodeSelector() throws IOException {
|
||||
Request request = new Request("GET", "/200");
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
options.setNodeSelector(firstPositionNodeSelector());
|
||||
request.setOptions(options);
|
||||
int rounds = between(1, 10);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
/*
|
||||
* Run the request more than once to verify that the
|
||||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
if (stoppedFirstHost) {
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
fail("expected to fail to connect");
|
||||
} catch (ConnectException e) {
|
||||
// Windows isn't consistent here. Sometimes the message is even null!
|
||||
if (false == System.getProperty("os.name").startsWith("Windows")) {
|
||||
assertEquals("Connection refused", e.getMessage());
|
||||
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
|
||||
Request request = new Request("GET", "/200");
|
||||
int rounds = between(1, 10);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
/*
|
||||
* Run the request more than once to verify that the
|
||||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
if (stoppedFirstHost) {
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
fail("expected to fail to connect");
|
||||
} catch (ConnectException e) {
|
||||
// Windows isn't consistent here. Sometimes the message is even null!
|
||||
if (false == System.getProperty("os.name").startsWith("Windows")) {
|
||||
assertEquals("Connection refused", e.getMessage());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Response response = restClient.performRequest(request);
|
||||
assertEquals(httpHosts[0], response.getHost());
|
||||
}
|
||||
} else {
|
||||
Response response = restClient.performRequest(request);
|
||||
assertEquals(httpHosts[0], response.getHost());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,9 +35,7 @@ import org.apache.http.message.BasicHttpResponse;
|
|||
import org.apache.http.message.BasicStatusLine;
|
||||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
|
||||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
|
||||
import org.elasticsearch.client.Node.Roles;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -74,13 +72,11 @@ import static org.mockito.Mockito.when;
|
|||
public class RestClientMultipleHostsTests extends RestClientTestCase {
|
||||
|
||||
private ExecutorService exec = Executors.newFixedThreadPool(1);
|
||||
private RestClient restClient;
|
||||
private List<Node> nodes;
|
||||
private HostsTrackingFailureListener failureListener;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void createRestClient() throws IOException {
|
||||
public RestClient createRestClient(NodeSelector nodeSelector) {
|
||||
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
|
||||
|
@ -119,7 +115,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
nodes = Collections.unmodifiableList(nodes);
|
||||
failureListener = new HostsTrackingFailureListener();
|
||||
restClient = new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener);
|
||||
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,12 +127,13 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
|
||||
public void testRoundRobinOkStatusCodes() throws IOException {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
Set<HttpHost> hostsSet = hostsSet();
|
||||
for (int j = 0; j < nodes.size(); j++) {
|
||||
int statusCode = randomOkStatusCode(getRandom());
|
||||
Response response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
|
||||
Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
assertEquals(statusCode, response.getStatusLine().getStatusCode());
|
||||
assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost()));
|
||||
}
|
||||
|
@ -146,6 +143,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
|
||||
public void testRoundRobinNoRetryErrors() throws IOException {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
Set<HttpHost> hostsSet = hostsSet();
|
||||
|
@ -153,7 +151,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
String method = randomHttpMethod(getRandom());
|
||||
int statusCode = randomErrorNoRetryStatusCode(getRandom());
|
||||
try {
|
||||
Response response = restClient.performRequest(method, "/" + statusCode);
|
||||
Response response = restClient.performRequest(new Request(method, "/" + statusCode));
|
||||
if (method.equals("HEAD") && statusCode == 404) {
|
||||
//no exception gets thrown although we got a 404
|
||||
assertEquals(404, response.getStatusLine().getStatusCode());
|
||||
|
@ -178,9 +176,10 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
|
||||
public void testRoundRobinRetryErrors() throws IOException {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
String retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
/*
|
||||
|
@ -237,7 +236,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
for (int j = 0; j < nodes.size(); j++) {
|
||||
retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
Response response = e.getResponse();
|
||||
|
@ -269,7 +268,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
int statusCode = randomErrorNoRetryStatusCode(getRandom());
|
||||
Response response;
|
||||
try {
|
||||
response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
|
||||
response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
} catch (ResponseException e) {
|
||||
response = e.getResponse();
|
||||
}
|
||||
|
@ -286,7 +285,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
for (int y = 0; y < i + 1; y++) {
|
||||
retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
Response response = e.getResponse();
|
||||
|
@ -323,6 +322,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
assertTrue(found);
|
||||
}
|
||||
};
|
||||
RestClient restClient = createRestClient(firstPositionOnly);
|
||||
int rounds = between(1, 10);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
/*
|
||||
|
@ -330,18 +330,16 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
Request request = new Request("GET", "/200");
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
options.setNodeSelector(firstPositionOnly);
|
||||
request.setOptions(options);
|
||||
Response response = restClient.performRequest(request);
|
||||
assertEquals(nodes.get(0).getHost(), response.getHost());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetNodes() throws IOException {
|
||||
RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS);
|
||||
List<Node> newNodes = new ArrayList<>(nodes.size());
|
||||
for (int i = 0; i < nodes.size(); i++) {
|
||||
Roles roles = i == 0 ? new Roles(false, true, true) : new Roles(true, false, false);
|
||||
Node.Roles roles = i == 0 ? new Node.Roles(false, true, true) : new Node.Roles(true, false, false);
|
||||
newNodes.add(new Node(nodes.get(i).getHost(), null, null, null, roles, null));
|
||||
}
|
||||
restClient.setNodes(newNodes);
|
||||
|
@ -352,9 +350,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
Request request = new Request("GET", "/200");
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
options.setNodeSelector(NodeSelector.NOT_MASTER_ONLY);
|
||||
request.setOptions(options);
|
||||
Response response = restClient.performRequest(request);
|
||||
assertEquals(newNodes.get(0).getHost(), response.getHost());
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
node = new Node(new HttpHost("localhost", 9200));
|
||||
failureListener = new HostsTrackingFailureListener();
|
||||
restClient = new RestClient(httpClient, 10000, defaultHeaders,
|
||||
singletonList(node), null, failureListener);
|
||||
singletonList(node), null, failureListener, NodeSelector.ANY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -54,7 +54,7 @@ public class RestClientTests extends RestClientTestCase {
|
|||
public void testCloseIsIdempotent() throws IOException {
|
||||
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
|
||||
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
|
||||
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null);
|
||||
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null);
|
||||
restClient.close();
|
||||
verify(closeableHttpAsyncClient, times(1)).close();
|
||||
restClient.close();
|
||||
|
@ -475,7 +475,7 @@ public class RestClientTests extends RestClientTestCase {
|
|||
private static RestClient createRestClient() {
|
||||
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
|
||||
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000),
|
||||
new Header[] {}, nodes, null, null);
|
||||
new Header[] {}, nodes, null, null, null);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.http.nio.entity.NStringEntity;
|
|||
import org.apache.http.ssl.SSLContextBuilder;
|
||||
import org.apache.http.ssl.SSLContexts;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.client.HasAttributeNodeSelector;
|
||||
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
|
||||
import org.elasticsearch.client.Node;
|
||||
import org.elasticsearch.client.NodeSelector;
|
||||
|
@ -54,6 +53,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
|
@ -82,8 +82,7 @@ public class RestClientDocumentation {
|
|||
static {
|
||||
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
|
||||
builder.addHeader("Authorization", "Bearer " + TOKEN); // <1>
|
||||
builder.setNodeSelector(NodeSelector.NOT_MASTER_ONLY); // <2>
|
||||
builder.setHttpAsyncResponseConsumerFactory( // <3>
|
||||
builder.setHttpAsyncResponseConsumerFactory( // <2>
|
||||
new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
|
||||
COMMON_OPTIONS = builder.build();
|
||||
}
|
||||
|
@ -115,6 +114,45 @@ public class RestClientDocumentation {
|
|||
builder.setMaxRetryTimeoutMillis(10000); // <1>
|
||||
//end::rest-client-init-max-retry-timeout
|
||||
}
|
||||
{
|
||||
//tag::rest-client-init-node-selector
|
||||
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
|
||||
builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS); // <1>
|
||||
//end::rest-client-init-node-selector
|
||||
}
|
||||
{
|
||||
//tag::rest-client-init-allocation-aware-selector
|
||||
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
|
||||
builder.setNodeSelector(new NodeSelector() { // <1>
|
||||
@Override
|
||||
public void select(Iterable<Node> nodes) {
|
||||
/*
|
||||
* Prefer any node that belongs to rack_one. If none is around
|
||||
* we will go to another rack till it's time to try and revive
|
||||
* some of the nodes that belong to rack_one.
|
||||
*/
|
||||
boolean foundOne = false;
|
||||
for (Node node : nodes) {
|
||||
String rackId = node.getAttributes().get("rack_id").get(0);
|
||||
if ("rack_one".equals(rackId)) {
|
||||
foundOne = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (foundOne) {
|
||||
Iterator<Node> nodesIt = nodes.iterator();
|
||||
while (nodesIt.hasNext()) {
|
||||
Node node = nodesIt.next();
|
||||
String rackId = node.getAttributes().get("rack_id").get(0);
|
||||
if ("rack_one".equals(rackId) == false) {
|
||||
nodesIt.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
//end::rest-client-init-allocation-aware-selector
|
||||
}
|
||||
{
|
||||
//tag::rest-client-init-failure-listener
|
||||
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
|
||||
|
@ -198,13 +236,6 @@ public class RestClientDocumentation {
|
|||
request.setOptions(options);
|
||||
//end::rest-client-options-customize-header
|
||||
}
|
||||
{
|
||||
//tag::rest-client-options-customize-attribute
|
||||
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
|
||||
options.setNodeSelector(new HasAttributeNodeSelector("rack", "c12")); // <1>
|
||||
request.setOptions(options);
|
||||
//end::rest-client-options-customize-attribute
|
||||
}
|
||||
}
|
||||
{
|
||||
HttpEntity[] documents = new HttpEntity[10];
|
||||
|
|
|
@ -99,3 +99,30 @@ http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`netwo
|
|||
to your
|
||||
http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java
|
||||
security policy].
|
||||
|
||||
=== Node selector
|
||||
|
||||
The client sends each request to one of the configured nodes in round-robin
|
||||
fashion. Nodes can optionally be filtered through a node selector that needs
|
||||
to be provided when initializing the client. This is useful when sniffing is
|
||||
enabled, in case only dedicated master nodes should be hit by HTTP requests.
|
||||
For each request the client will run the eventually configured node selector
|
||||
to filter the node candidates, then select the next one in the list out of the
|
||||
remaining ones.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-allocation-aware-selector]
|
||||
--------------------------------------------------
|
||||
<1> Set an allocation aware node selector that allows to pick a node in the
|
||||
local rack if any available, otherwise go to any other node in any rack. It
|
||||
acts as a preference rather than a strict requirement, given that it goes to
|
||||
another rack if none of the local nodes are available, rather than returning
|
||||
no nodes in such case which would make the client forcibly revive a local node
|
||||
whenever none of the nodes from the preferred rack is available.
|
||||
|
||||
WARNING: Node selectors that do not consistently select the same set of nodes
|
||||
will make round-robin behaviour unpredictable and possibly unfair. The
|
||||
preference example above is fine as it reasons about availability of nodes
|
||||
which already affects the predictability of round-robin. Node selection should
|
||||
not depend on other external factors or round-robin will not work properly.
|
||||
|
|
|
@ -196,6 +196,16 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failur
|
|||
<1> Set a listener that gets notified every time a node fails, in case actions
|
||||
need to be taken. Used internally when sniffing on failure is enabled.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-node-selector]
|
||||
--------------------------------------------------
|
||||
<1> Set the node selector to be used to filter the nodes the client will send
|
||||
requests to among the ones that are set to the client itself. This is useful
|
||||
for instance to prevent sending requests to dedicated master nodes when
|
||||
sniffing is enabled. By default the client sends requests to every configured
|
||||
node.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-request-config-callback]
|
||||
|
@ -283,8 +293,7 @@ instance and share it between all requests:
|
|||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-singleton]
|
||||
--------------------------------------------------
|
||||
<1> Add any headers needed by all requests.
|
||||
<2> Set a `NodeSelector`.
|
||||
<3> Customize the response consumer.
|
||||
<2> Customize the response consumer.
|
||||
|
||||
`addHeader` is for headers that are required for authorization or to work with
|
||||
a proxy in front of Elasticsearch. There is no need to set the `Content-Type`
|
||||
|
@ -315,15 +324,6 @@ adds an extra header:
|
|||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-header]
|
||||
--------------------------------------------------
|
||||
|
||||
Or you can send requests to nodes with a particular attribute:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-attribute]
|
||||
--------------------------------------------------
|
||||
<1> Replace the node selector with one that selects nodes on a particular rack.
|
||||
|
||||
|
||||
==== Multiple parallel asynchronous actions
|
||||
|
||||
The client is quite happy to execute many actions in parallel. The following
|
||||
|
|
|
@ -91,8 +91,9 @@ public class DocsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
|
|||
final RestClient restClient,
|
||||
final List<HttpHost> hosts,
|
||||
final Version esVersion,
|
||||
final Version masterVersion) throws IOException {
|
||||
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion);
|
||||
final Version masterVersion) {
|
||||
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion,
|
||||
restClientBuilder -> configureClient(restClientBuilder, restClientSettings()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
a57659a275921d8ab3f7ec580e9bf713ce6143b1
|
|
@ -0,0 +1 @@
|
|||
9f0a326f7ec1671ffb07f95b27f1a5812b7dc1c3
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -91,7 +92,7 @@ public class Netty4ScheduledPingTests extends ESTestCase {
|
|||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
b91a260d8d12ee4b3302a63059c73a34de0ce146
|
|
@ -0,0 +1 @@
|
|||
394e811e9d9bf0b9fba837f7ceca9e8f3e39d1c2
|
|
@ -1 +0,0 @@
|
|||
cc1ca9bd9e2c162dd1da8c2e7111913fd8033e48
|
|
@ -0,0 +1 @@
|
|||
5cd56acfa16ba20e19b5d21d90b510eada841431
|
|
@ -1 +0,0 @@
|
|||
2fa3662a10a9e085b1c7b87293d727422cbe6224
|
|
@ -0,0 +1 @@
|
|||
db7b56f4cf533ad9022d2312c5ee48331edccca3
|
|
@ -1 +0,0 @@
|
|||
60aa50c11857e6739e68936cb45102562b2c46b4
|
|
@ -0,0 +1 @@
|
|||
e8dba4d28a595eab2e8fb6095d1ac5f2d3872144
|
|
@ -1 +0,0 @@
|
|||
4586368007785a3be26db4b9ce404ffb8c76f350
|
|
@ -0,0 +1 @@
|
|||
1243c771ee824c46a3d66ae3e4256d919fc06fbe
|
|
@ -1 +0,0 @@
|
|||
9c6d030ab2c148df7a6ba73a774ef4b8c720a6cb
|
|
@ -0,0 +1 @@
|
|||
c783794b0d20d8dc1285edc7701f386b1f0e2fb8
|
|
@ -1 +0,0 @@
|
|||
8275bf8df2644d5fcec2963cf237d14b6e00fefe
|
|
@ -0,0 +1 @@
|
|||
9438efa504a89afb6cb4c66448c257f865164d23
|
|
@ -132,7 +132,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
|||
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
|
||||
*/
|
||||
@Override
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
|
||||
if (refreshInterval.millis() != 0) {
|
||||
if (dynamicHosts != null &&
|
||||
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
|
||||
|
|
|
@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
|
||||
return dynamicHosts.getOrRefresh();
|
||||
}
|
||||
|
||||
|
|
|
@ -93,7 +93,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
|
||||
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
|
||||
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
|
||||
logger.debug("--> addresses found: {}", dynamicHosts);
|
||||
return dynamicHosts;
|
||||
} catch (IOException e) {
|
||||
|
@ -307,7 +307,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicHosts();
|
||||
provider.buildDynamicHosts(null);
|
||||
}
|
||||
assertThat(provider.fetchCount, is(3));
|
||||
}
|
||||
|
@ -324,12 +324,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicHosts();
|
||||
provider.buildDynamicHosts(null);
|
||||
}
|
||||
assertThat(provider.fetchCount, is(1));
|
||||
Thread.sleep(1_000L); // wait for cache to expire
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicHosts();
|
||||
provider.buildDynamicHosts(null);
|
||||
}
|
||||
assertThat(provider.fetchCount, is(2));
|
||||
}
|
||||
|
|
|
@ -19,35 +19,17 @@
|
|||
|
||||
package org.elasticsearch.discovery.file;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -57,47 +39,19 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||
|
||||
private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);
|
||||
|
||||
private final Settings settings;
|
||||
private final Path configPath;
|
||||
private ExecutorService fileBasedDiscoveryExecutorService;
|
||||
|
||||
public FileBasedDiscoveryPlugin(Settings settings, Path configPath) {
|
||||
this.settings = settings;
|
||||
this.configPath = configPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
|
||||
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
|
||||
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
|
||||
0,
|
||||
concurrentConnects,
|
||||
60,
|
||||
TimeUnit.SECONDS,
|
||||
threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
|
||||
NetworkService networkService) {
|
||||
return Collections.singletonMap(
|
||||
"file",
|
||||
() -> new FileBasedUnicastHostsProvider(
|
||||
new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService));
|
||||
() -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,26 +23,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
|
||||
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
|
||||
|
||||
/**
|
||||
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
|
||||
* from {@link #UNICAST_HOSTS_FILE}.
|
||||
|
@ -59,23 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
|
|||
|
||||
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
|
||||
|
||||
private final TransportService transportService;
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private final Path unicastHostsFilePath;
|
||||
|
||||
private final TimeValue resolveTimeout;
|
||||
|
||||
FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) {
|
||||
FileBasedUnicastHostsProvider(Environment environment) {
|
||||
super(environment.settings());
|
||||
this.transportService = transportService;
|
||||
this.executorService = executorService;
|
||||
this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
|
||||
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
|
||||
List<String> hostsList;
|
||||
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
|
||||
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
|
||||
|
@ -90,21 +75,8 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
|
|||
hostsList = Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<TransportAddress> dynamicHosts = new ArrayList<>();
|
||||
try {
|
||||
dynamicHosts.addAll(resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
hostsList,
|
||||
1,
|
||||
transportService,
|
||||
resolveTimeout));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
final List<TransportAddress> dynamicHosts = hostsResolver.resolveHosts(hostsList, 1);
|
||||
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);
|
||||
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,9 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
|
@ -123,8 +125,10 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
|
||||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
|
||||
final List<TransportAddress> addresses = provider.buildDynamicHosts();
|
||||
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment);
|
||||
final List<TransportAddress> addresses = provider.buildDynamicHosts((hosts, limitPortCounts) ->
|
||||
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
|
||||
TimeValue.timeValueSeconds(10)));
|
||||
assertEquals(0, addresses.size());
|
||||
}
|
||||
|
||||
|
@ -163,6 +167,8 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
return new FileBasedUnicastHostsProvider(
|
||||
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
|
||||
new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) ->
|
||||
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
|
||||
TimeValue.timeValueSeconds(10)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
|
||||
*/
|
||||
@Override
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
|
||||
// We check that needed properties have been set
|
||||
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
|
||||
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +
|
||||
|
|
|
@ -108,7 +108,7 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
|
||||
transportService, new NetworkService(Collections.emptyList()));
|
||||
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
|
||||
logger.info("--> addresses found: {}", dynamicHosts);
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
|
|
@ -103,12 +103,12 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
|
|||
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
|
||||
try {
|
||||
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
|
||||
(request, channel) -> {
|
||||
(request, channel, task) -> {
|
||||
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
|
||||
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
|
||||
});
|
||||
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
|
||||
(request, channel) -> {
|
||||
(request, channel, task) -> {
|
||||
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
|
||||
for (DiscoveryNode node : knownNodes) {
|
||||
builder.add(node);
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
557d62d2b13d3dcb1810a1633e22625e42425425
|
|
@ -0,0 +1 @@
|
|||
e1afb580df500626a1c695e0fc9a7e8a8f58bcac
|
|
@ -1 +0,0 @@
|
|||
d3755ad4c98b49fe5055b32358e3071727177c03
|
|
@ -0,0 +1 @@
|
|||
a6ad941ef1fdad48673ed511631b7e48a9456bf7
|
|
@ -1 +0,0 @@
|
|||
c1bbf611535f0b0fd0ba14e8da67c8d645b95244
|
|
@ -0,0 +1 @@
|
|||
730d9ac80436c8cbc0b2a8a749259be536b97316
|
|
@ -1 +0,0 @@
|
|||
b62ebd53bbefb2f59cd246157a6768cae8a5a3a1
|
|
@ -0,0 +1 @@
|
|||
56f99858a4421a517b52da36a222debcccab80c6
|
|
@ -1 +0,0 @@
|
|||
cba0fd4ccb98db8a72287a95d6b653e455f9eeb3
|
|
@ -0,0 +1 @@
|
|||
5266b45d7f049662817d739881765904621876d0
|
|
@ -1 +0,0 @@
|
|||
5127ed0b7516f8b28d84e837df4f33c67e361f6c
|
|
@ -0,0 +1 @@
|
|||
c77154d18c4944ceb6ce0741060632f57d623fdc
|
|
@ -1 +0,0 @@
|
|||
45c7b13aae1104f9f5f0fca0606e5741309c8d74
|
|
@ -0,0 +1 @@
|
|||
186ff981feec1bdbf1a6236e786ec171b5fbe3e0
|
|
@ -1 +0,0 @@
|
|||
2540c4b5d9dca8a39a3b4d58efe4ab484df7254f
|
|
@ -0,0 +1 @@
|
|||
bf844bb6f6d84da19e8c79ce5fbb4cf6d00f2611
|
|
@ -1 +0,0 @@
|
|||
e9d0c0c020917d4bf9b590526866ff5547dbaa17
|
|
@ -0,0 +1 @@
|
|||
229a50e6d9d4db076f671c230d493000c6e2972c
|
|
@ -1 +0,0 @@
|
|||
50969cdb7279047fbec94dda6e7d74d1c73c07f8
|
|
@ -0,0 +1 @@
|
|||
8e58add0d0c39df97d07c8e343041989bf4b3a3f
|
|
@ -1 +0,0 @@
|
|||
94524b293572b1f0d01a0faeeade1ff24713f966
|
|
@ -0,0 +1 @@
|
|||
1692604fa06a945d1ee19939022ef1a912235db3
|
|
@ -1 +0,0 @@
|
|||
878db723e41ece636ed338c4ef374e900f221a14
|
|
@ -0,0 +1 @@
|
|||
847d2f897961124e2fc7d5e55d8309635bb026bc
|
|
@ -1 +0,0 @@
|
|||
c8dc85c32aeac6ff320aa6a9ea57881ad4847a55
|
|
@ -0,0 +1 @@
|
|||
586892eefc0546643d7f5d7f83659c7db0d534ff
|
|
@ -1 +0,0 @@
|
|||
203d8d22ab172e624784a5fdeaecdd01ae25fb3d
|
|
@ -0,0 +1 @@
|
|||
32cd2854f39ff453a5d128ce40e11eea4168abbf
|
|
@ -1 +0,0 @@
|
|||
4d6cf8fa1064a86991d5cd12a2ed32119ac91212
|
|
@ -0,0 +1 @@
|
|||
0cdc1a512032f8b23dd4b1add0f5cd06325addc3
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.node.liveness;
|
|||
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
|
@ -39,7 +40,7 @@ public final class TransportLivenessAction implements TransportRequestHandler<Li
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(LivenessRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(new LivenessResponse(clusterService.getClusterName(), clusterService.localNode()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -285,7 +286,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
|
||||
class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
|
||||
@Override
|
||||
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
if (request.ban) {
|
||||
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
|
||||
clusterService.localNode().getId(), request.reason);
|
||||
|
|
|
@ -45,13 +45,10 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
|||
import org.elasticsearch.search.query.QuerySearchRequest;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.search.query.ScrollQuerySearchResult;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
|
@ -314,150 +311,116 @@ public class SearchTransportService extends AbstractComponent {
|
|||
|
||||
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
}
|
||||
});
|
||||
(request, channel, task) -> {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
|
||||
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
|
||||
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
|
||||
@Override
|
||||
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
}
|
||||
});
|
||||
(request, channel, task) -> {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
|
||||
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
|
||||
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
|
||||
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
|
||||
searchService.freeAllScrollContexts();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
});
|
||||
ThreadPool.Names.SAME, (request, channel, task) -> {
|
||||
searchService.freeAllScrollContexts();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
|
||||
() -> TransportResponse.Empty.INSTANCE);
|
||||
|
||||
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
try {
|
||||
channel.sendResponse(searchPhaseResult);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
try {
|
||||
channel.sendResponse(searchPhaseResult);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
try {
|
||||
channel.sendResponse(searchPhaseResult);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
try {
|
||||
channel.sendResponse(searchPhaseResult);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
|
||||
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
|
||||
@Override
|
||||
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
|
||||
@Override
|
||||
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
|
||||
@Override
|
||||
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
|
||||
|
||||
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
|
||||
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
boolean canMatch = searchService.canMatch(request);
|
||||
channel.sendResponse(new CanMatchResponse(canMatch));
|
||||
}
|
||||
(request, channel, task) -> {
|
||||
boolean canMatch = searchService.canMatch(request);
|
||||
channel.sendResponse(new CanMatchResponse(canMatch));
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
|
||||
(Supplier<TransportResponse>) CanMatchResponse::new);
|
||||
|
|
|
@ -64,11 +64,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
|
|||
|
||||
class TransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
// We already got the task created on the network layer - no need to create it again on the transport layer
|
||||
|
|
|
@ -284,10 +284,5 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(shardOperation(request, task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -393,7 +393,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
|
||||
class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> {
|
||||
@Override
|
||||
public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
List<ShardRouting> shards = request.getShards();
|
||||
final int totalShards = shards.size();
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
|
|
@ -258,12 +258,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(nodeOperation(request, task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
|
||||
channel.sendResponse(nodeOperation(request));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -273,11 +273,6 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
}
|
||||
|
||||
protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
|
||||
|
@ -286,11 +281,6 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
|
||||
new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
|
||||
|
@ -493,12 +483,6 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest, final TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -243,7 +244,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|||
private class ShardTransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
shardOperation(request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
@ -271,7 +272,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
|||
private class TransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
// if we have a local operation, execute it on a thread since we don't spawn
|
||||
execute(request, new ActionListener<Response>() {
|
||||
@Override
|
||||
|
@ -298,7 +299,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
|||
private class ShardTransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ public abstract class TransportTasksAction<
|
|||
class NodeTransportHandler implements TransportRequestHandler<NodeTaskRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
nodeOperation(request, new ActionListener<NodeTasksResponse>() {
|
||||
@Override
|
||||
public void onResponse(
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -65,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
|
|||
private class NodeMappingRefreshTransportHandler implements TransportRequestHandler<NodeMappingRefreshRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
metaDataMappingService.refreshMapping(request.index(), request.indexUUID());
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
|
@ -237,7 +238,7 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(FailedShardEntry request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) throws Exception {
|
||||
logger.debug(() -> new ParameterizedMessage("{} received shard failed for {}", request.shardId, request), request.failure);
|
||||
clusterService.submitStateUpdateTask(
|
||||
"shard-failed",
|
||||
|
@ -487,7 +488,7 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(StartedShardEntry request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception {
|
||||
logger.debug("{} received shard started for [{}]", request.shardId, request);
|
||||
clusterService.submitStateUpdateTask(
|
||||
"shard-started " + request,
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.discovery.DiscoveryModule;
|
|||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
@ -357,7 +358,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
|
||||
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
|
||||
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
|
||||
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
|
||||
SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
|
||||
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
|
||||
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
|
||||
SearchService.DEFAULT_KEEPALIVE_SETTING,
|
||||
|
|
|
@ -31,7 +31,9 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
|
||||
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
|
@ -42,13 +44,15 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A module for loading classes for node discovery.
|
||||
|
@ -57,8 +61,8 @@ public class DiscoveryModule {
|
|||
|
||||
public static final Setting<String> DISCOVERY_TYPE_SETTING =
|
||||
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
|
||||
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
|
||||
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
|
||||
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
|
||||
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
private final Discovery discovery;
|
||||
|
||||
|
@ -66,9 +70,9 @@ public class DiscoveryModule {
|
|||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
|
||||
AllocationService allocationService) {
|
||||
final UnicastHostsProvider hostsProvider;
|
||||
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
|
@ -80,17 +84,32 @@ public class DiscoveryModule {
|
|||
joinValidators.add(joinValidator);
|
||||
}
|
||||
}
|
||||
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
if (hostsProviderName.isPresent()) {
|
||||
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
|
||||
if (hostsProviderSupplier == null) {
|
||||
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
|
||||
}
|
||||
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
|
||||
} else {
|
||||
hostsProvider = Collections::emptyList;
|
||||
List<String> hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
// for bwc purposes, add settings provider even if not explicitly specified
|
||||
if (hostsProviderNames.contains("settings") == false) {
|
||||
List<String> extendedHostsProviderNames = new ArrayList<>();
|
||||
extendedHostsProviderNames.add("settings");
|
||||
extendedHostsProviderNames.addAll(hostsProviderNames);
|
||||
hostsProviderNames = extendedHostsProviderNames;
|
||||
}
|
||||
|
||||
final Set<String> missingProviderNames = new HashSet<>(hostsProviderNames);
|
||||
missingProviderNames.removeAll(hostProviders.keySet());
|
||||
if (missingProviderNames.isEmpty() == false) {
|
||||
throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames);
|
||||
}
|
||||
|
||||
List<UnicastHostsProvider> filteredHostsProviders = hostsProviderNames.stream()
|
||||
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());
|
||||
|
||||
final UnicastHostsProvider hostsProvider = hostsResolver -> {
|
||||
final List<TransportAddress> addresses = new ArrayList<>();
|
||||
for (UnicastHostsProvider provider : filteredHostsProviders) {
|
||||
addresses.addAll(provider.buildDynamicHosts(hostsResolver));
|
||||
}
|
||||
return Collections.unmodifiableList(addresses);
|
||||
};
|
||||
|
||||
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
|
||||
discoveryTypes.put("zen",
|
||||
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -321,7 +322,7 @@ public class MasterFaultDetection extends FaultDetection {
|
|||
private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final MasterPingRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
|
||||
// check if we are really the same master as the one we seemed to be think we are
|
||||
// this can happen if the master got "kill -9" and then another node started using the same port
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -133,7 +134,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
listener.onJoin(request.node, new JoinCallback() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
|
@ -190,7 +191,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(ValidateJoinRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
DiscoveryNode node = localNodeSupplier.get();
|
||||
assert node != null : "local node is null";
|
||||
joinValidators.stream().forEach(action -> action.accept(node, request.state));
|
||||
|
@ -281,7 +282,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(LeaveRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
listener.onLeave(request.node);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -276,7 +277,7 @@ public class NodesFaultDetection extends FaultDetection {
|
|||
|
||||
class PingRequestHandler implements TransportRequestHandler<PingRequest> {
|
||||
@Override
|
||||
public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(PingRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
// if we are not the node we are supposed to be pinged, send an exception
|
||||
// this can happen when a kill -9 is sent, and another node is started using the same port
|
||||
if (!localNode.equals(request.targetNode())) {
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
|
|||
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BytesTransportRequest;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
|
@ -447,14 +448,14 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
handleIncomingClusterStateRequest(request, channel);
|
||||
}
|
||||
}
|
||||
|
||||
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
|
||||
@Override
|
||||
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
handleCommitRequest(request, channel);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
/**
|
||||
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
|
||||
* from the "discovery.zen.ping.unicast.hosts" node setting. If the port is
|
||||
* left off an entry, a default port of 9300 is assumed.
|
||||
*
|
||||
* An example unicast hosts setting might look as follows:
|
||||
* [67.81.244.10, 67.81.244.11:9305, 67.81.244.15:9400]
|
||||
*/
|
||||
public class SettingsBasedHostsProvider extends AbstractComponent implements UnicastHostsProvider {
|
||||
|
||||
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
|
||||
Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Setting.Property.NodeScope);
|
||||
|
||||
// these limits are per-address
|
||||
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
|
||||
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
|
||||
|
||||
private final List<String> configuredHosts;
|
||||
|
||||
private final int limitPortCounts;
|
||||
|
||||
public SettingsBasedHostsProvider(Settings settings, TransportService transportService) {
|
||||
super(settings);
|
||||
|
||||
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
|
||||
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
// we only limit to 1 address, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
} else {
|
||||
// if unicast hosts are not specified, fill with simple defaults on the local machine
|
||||
configuredHosts = transportService.getLocalAddresses();
|
||||
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
|
||||
}
|
||||
|
||||
logger.debug("using initial hosts {}", configuredHosts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
|
||||
return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
|
||||
}
|
||||
|
||||
}
|
|
@ -31,5 +31,15 @@ public interface UnicastHostsProvider {
|
|||
/**
|
||||
* Builds the dynamic list of unicast hosts to be used for unicast discovery.
|
||||
*/
|
||||
List<TransportAddress> buildDynamicHosts();
|
||||
List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver);
|
||||
|
||||
/**
|
||||
* Helper object that allows to resolve a list of hosts to a list of transport addresses.
|
||||
* Each host is resolved into a transport address (or a collection of addresses if the
|
||||
* number of ports is greater than one)
|
||||
*/
|
||||
interface HostsResolver {
|
||||
List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
|
@ -82,11 +83,9 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
|
@ -94,26 +93,15 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
|
|||
public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
||||
|
||||
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
|
||||
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
|
||||
Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(),
|
||||
Property.NodeScope);
|
||||
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
|
||||
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope);
|
||||
public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
|
||||
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope);
|
||||
|
||||
// these limits are per-address
|
||||
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
|
||||
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportService transportService;
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final List<String> configuredHosts;
|
||||
|
||||
private final int limitPortCounts;
|
||||
|
||||
private final PingContextProvider contextProvider;
|
||||
|
||||
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
|
||||
|
@ -141,19 +129,10 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
this.contextProvider = contextProvider;
|
||||
|
||||
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
|
||||
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
|
||||
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
} else {
|
||||
// if unicast hosts are not specified, fill with simple defaults on the local machine
|
||||
configuredHosts = transportService.getLocalAddresses();
|
||||
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
|
||||
}
|
||||
|
||||
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
|
||||
logger.debug(
|
||||
"using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]",
|
||||
configuredHosts,
|
||||
"using concurrent_connects [{}], resolve_timeout [{}]",
|
||||
concurrentConnects,
|
||||
resolveTimeout);
|
||||
|
||||
|
@ -172,9 +151,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
}
|
||||
|
||||
/**
|
||||
* Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
|
||||
* if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
|
||||
* in parallel using specified executor service up to the specified resolve timeout.
|
||||
* Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of
|
||||
* addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up
|
||||
* to the specified resolve timeout.
|
||||
*
|
||||
* @param executorService the executor service used to parallelize hostname lookups
|
||||
* @param logger logger used for logging messages regarding hostname lookups
|
||||
|
@ -190,7 +169,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
final List<String> hosts,
|
||||
final int limitPortCounts,
|
||||
final TransportService transportService,
|
||||
final TimeValue resolveTimeout) throws InterruptedException {
|
||||
final TimeValue resolveTimeout) {
|
||||
Objects.requireNonNull(executorService);
|
||||
Objects.requireNonNull(logger);
|
||||
Objects.requireNonNull(hosts);
|
||||
|
@ -205,8 +184,13 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
.stream()
|
||||
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
|
||||
.collect(Collectors.toList());
|
||||
final List<Future<TransportAddress[]>> futures =
|
||||
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
|
||||
final List<Future<TransportAddress[]>> futures;
|
||||
try {
|
||||
futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
final List<TransportAddress> transportAddresses = new ArrayList<>();
|
||||
final Set<TransportAddress> localAddresses = new HashSet<>();
|
||||
localAddresses.add(transportService.boundAddress().publishAddress());
|
||||
|
@ -232,6 +216,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
assert e.getCause() != null;
|
||||
final String message = "failed to resolve host [" + hostname + "]";
|
||||
logger.warn(message, e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
// ignore
|
||||
}
|
||||
} else {
|
||||
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
|
||||
|
@ -240,6 +227,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
return Collections.unmodifiableList(transportAddresses);
|
||||
}
|
||||
|
||||
private UnicastHostsProvider.HostsResolver createHostsResolver() {
|
||||
return (hosts, limitPortCounts) -> resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
|
||||
limitPortCounts, transportService, resolveTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
|
||||
|
@ -281,18 +273,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
final TimeValue scheduleDuration,
|
||||
final TimeValue requestDuration) {
|
||||
final List<TransportAddress> seedAddresses = new ArrayList<>();
|
||||
try {
|
||||
seedAddresses.addAll(resolveHostsLists(
|
||||
unicastZenPingExecutorService,
|
||||
logger,
|
||||
configuredHosts,
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
resolveTimeout));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
seedAddresses.addAll(hostsProvider.buildDynamicHosts());
|
||||
seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver()));
|
||||
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
|
||||
// add all possible master nodes that were active in the last known cluster configuration
|
||||
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
|
||||
|
@ -583,7 +564,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
if (closed) {
|
||||
throw new AlreadyClosedException("node is shutting down");
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.discovery.Discovery;
|
|||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.DiscoveryStats;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -1187,7 +1188,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
|
||||
@Override
|
||||
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
@ -112,7 +113,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
|
|||
|
||||
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
|
||||
@Override
|
||||
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
String[] indexNames = new String[request.indices.length];
|
||||
for (int i = 0; i < request.indices.length; i++) {
|
||||
indexNames[i] = request.indices[i].getIndex().getName();
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
@ -778,7 +779,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
|||
private final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(performPreSyncedFlush(request));
|
||||
}
|
||||
}
|
||||
|
@ -786,7 +787,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
|||
private final class SyncedFlushTransportHandler implements TransportRequestHandler<ShardSyncedFlushRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(performSyncedFlush(request));
|
||||
}
|
||||
}
|
||||
|
@ -794,7 +795,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
|||
private final class InFlightOpCountTransportHandler implements TransportRequestHandler<InFlightOpsRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(InFlightOpsRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(InFlightOpsRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
channel.sendResponse(performInFlightOps(request));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
|
|||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
|
@ -103,7 +104,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
|
|||
|
||||
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
|
||||
@Override
|
||||
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
RecoveryResponse response = recover(request);
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
import org.elasticsearch.index.translog.TranslogCorruptedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.FutureTransportResponseHandler;
|
||||
|
@ -397,7 +398,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
|
||||
Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
|
||||
)) {
|
||||
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
|
||||
|
@ -409,7 +411,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class FinalizeRecoveryRequestHandler implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef =
|
||||
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
|
||||
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
|
||||
|
@ -421,7 +423,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
|
||||
)) {
|
||||
recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion());
|
||||
|
@ -433,7 +435,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel,
|
||||
Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
|
||||
recoveryRef.target().handoffPrimaryContext(request.primaryContext());
|
||||
}
|
||||
|
@ -445,7 +448,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws IOException {
|
||||
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
|
||||
Task task) throws IOException {
|
||||
try (RecoveryRef recoveryRef =
|
||||
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
|
||||
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
||||
|
@ -463,7 +467,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
try {
|
||||
messageReceived(request, channel);
|
||||
messageReceived(request, channel, task);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
|
@ -537,7 +541,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
|
||||
)) {
|
||||
recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
|
||||
|
@ -550,7 +554,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanFilesRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
|
||||
)) {
|
||||
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
|
||||
|
@ -565,7 +569,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
||||
|
||||
@Override
|
||||
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
|
||||
)) {
|
||||
final RecoveryTarget recoveryTarget = recoveryRef.target();
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
@ -299,7 +300,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
private class ShardActiveRequestHandler implements TransportRequestHandler<ShardActiveRequest> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
|
||||
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
IndexShard indexShard = getShard(request);
|
||||
|
||||
// make sure shard is really there before register cluster state observer
|
||||
|
|
|
@ -349,7 +349,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
|
|||
Repository previous = repositories.get(repositoryMetaData.name());
|
||||
if (previous != null) {
|
||||
RepositoryMetaData previousMetadata = previous.getMetadata();
|
||||
if (!previousMetadata.type().equals(repositoryMetaData.type()) && previousMetadata.settings().equals(repositoryMetaData.settings())) {
|
||||
if (previousMetadata.equals(repositoryMetaData)) {
|
||||
// Previous version is the same as this one - ignore it
|
||||
return false;
|
||||
}
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue