From a9a2753f0b07ee644e1babfd9de4f00df17e7b2a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 22 Nov 2016 20:46:28 +0100 Subject: [PATCH] Add a HostFailureListener to notify client code if a node got disconnected (#21709) Today there is no way to get notified if a node is disconnected. Client code must poll the TransportClient constantly to detect that a node is not connected anymore in order to react and add new nodes or notify altering etc. For instance if a hostname gets resolved to an IP but that host is disconnected clients want to reconnect by resolving the hostname again which is a common situation in cloud environments. Closes #21424 --- .../resources/checkstyle_suppressions.xml | 2 - .../client/PreBuiltTransportClient.java | 25 +++++- .../client/transport/TransportClient.java | 43 ++++++++-- .../TransportClientNodesService.java | 70 ++++++++++------- .../{support => }/TransportProxyClient.java | 13 ++-- .../elasticsearch/cluster/ClusterState.java | 12 ++- .../common/settings/ClusterSettings.java | 10 +-- .../client/transport/NodeDisconnectIT.java | 78 +++++++++++++++++++ .../TransportClientNodesServiceTests.java | 2 +- .../transport/MockTransportClient.java | 6 +- 10 files changed, 207 insertions(+), 54 deletions(-) rename core/src/main/java/org/elasticsearch/client/transport/{support => }/TransportProxyClient.java (79%) create mode 100644 core/src/test/java/org/elasticsearch/client/transport/NodeDisconnectIT.java diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 28443f7ff47..83b4e332cbf 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -223,8 +223,6 @@ - - diff --git a/client/transport/src/main/java/org/elasticsearch/transport/client/PreBuiltTransportClient.java b/client/transport/src/main/java/org/elasticsearch/transport/client/PreBuiltTransportClient.java index ff4070042d2..ca90723ae82 100644 --- a/client/transport/src/main/java/org/elasticsearch/transport/client/PreBuiltTransportClient.java +++ b/client/transport/src/main/java/org/elasticsearch/transport/client/PreBuiltTransportClient.java @@ -55,13 +55,36 @@ public class PreBuiltTransportClient extends TransportClient { PercolatorPlugin.class, MustachePlugin.class)); + + /** + * Creates a new transport client with pre-installed plugins. + * @param settings the settings passed to this transport client + * @param plugins an optional array of additional plugins to run with this client + */ @SafeVarargs public PreBuiltTransportClient(Settings settings, Class... plugins) { this(settings, Arrays.asList(plugins)); } + + /** + * Creates a new transport client with pre-installed plugins. + * @param settings the settings passed to this transport client + * @param plugins a collection of additional plugins to run with this client + */ public PreBuiltTransportClient(Settings settings, Collection> plugins) { - super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS)); + this(settings, plugins, null); + } + + /** + * Creates a new transport client with pre-installed plugins. + * @param settings the settings passed to this transport client + * @param plugins a collection of additional plugins to run with this client + * @param hostFailureListener a failure listener that is invoked if a node is disconnected. This can be null + */ + public PreBuiltTransportClient(Settings settings, Collection> plugins, + HostFailureListener hostFailureListener) { + super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener); } @Override diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 673693c7c38..f680c336e35 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.support.AbstractClient; -import org.elasticsearch.client.transport.support.TransportProxyClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; @@ -40,6 +39,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.Node; @@ -65,6 +65,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + /** * The transport client allows to create a client that is not part of the cluster, but simply connects to one * or more nodes directly by adding their respective addresses using {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}. @@ -74,6 +76,15 @@ import java.util.stream.Collectors; */ public abstract class TransportClient extends AbstractClient { + public static final Setting CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL = + Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope); + public static final Setting CLIENT_TRANSPORT_PING_TIMEOUT = + Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope); + public static final Setting CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME = + Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope); + public static final Setting CLIENT_TRANSPORT_SNIFF = + Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope); + private static PluginsService newPluginService(final Settings settings, Collection> plugins) { final Settings.Builder settingsBuilder = Settings.builder() .put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval @@ -101,7 +112,7 @@ public abstract class TransportClient extends AbstractClient { } private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings, - Collection> plugins) { + Collection> plugins, HostFailureListener failureListner) { if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) { providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build(); } @@ -164,7 +175,8 @@ public abstract class TransportClient extends AbstractClient { Injector injector = modules.createInjector(); final TransportClientNodesService nodesService = - new TransportClientNodesService(settings, transportService, threadPool); + new TransportClientNodesService(settings, transportService, threadPool, failureListner == null + ? (t, e) -> {} : failureListner); final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService, actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList())); @@ -222,7 +234,7 @@ public abstract class TransportClient extends AbstractClient { * Creates a new TransportClient with the given settings and plugins */ public TransportClient(Settings settings, Collection> plugins) { - this(buildTemplate(settings, Settings.EMPTY, plugins)); + this(buildTemplate(settings, Settings.EMPTY, plugins, null)); } /** @@ -231,8 +243,9 @@ public abstract class TransportClient extends AbstractClient { * @param defaultSettings default settings that are merged after the plugins have added it's additional settings. * @param plugins the client plugins */ - protected TransportClient(Settings settings, Settings defaultSettings, Collection> plugins) { - this(buildTemplate(settings, defaultSettings, plugins)); + protected TransportClient(Settings settings, Settings defaultSettings, Collection> plugins, + HostFailureListener hostFailureListener) { + this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener)); } private TransportClient(ClientTemplate template) { @@ -332,4 +345,22 @@ public abstract class TransportClient extends AbstractClient { protected > void doExecute(Action action, Request request, ActionListener listener) { proxy.execute(action, request, listener); } + + /** + * Listener that allows to be notified whenever a node failure / disconnect happens + */ + @FunctionalInterface + public interface HostFailureListener { + /** + * Called once a node disconnect is detected. + * @param node the node that has been disconnected + * @param ex the exception causing the disconnection + */ + void onNodeDisconnected(DiscoveryNode node, Exception ex); + } + + // pkg private for testing + TransportClientNodesService getNodesService() { + return nodesService; + } } diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index f36e9c60526..b403a30ecd8 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -35,8 +35,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -45,6 +43,8 @@ import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.FutureTransportResponseHandler; +import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; @@ -64,9 +64,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; - -public class TransportClientNodesService extends AbstractComponent implements Closeable { +final class TransportClientNodesService extends AbstractComponent implements Closeable { private final TimeValue nodesSamplerInterval; @@ -100,37 +98,30 @@ public class TransportClientNodesService extends AbstractComponent implements Cl private volatile boolean closed; + private final TransportClient.HostFailureListener hostFailureListener; - public static final Setting CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL = - Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Property.NodeScope); - public static final Setting CLIENT_TRANSPORT_PING_TIMEOUT = - Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Property.NodeScope); - public static final Setting CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME = - Setting.boolSetting("client.transport.ignore_cluster_name", false, Property.NodeScope); - public static final Setting CLIENT_TRANSPORT_SNIFF = - Setting.boolSetting("client.transport.sniff", false, Property.NodeScope); - - public TransportClientNodesService(Settings settings,TransportService transportService, - ThreadPool threadPool) { + TransportClientNodesService(Settings settings, TransportService transportService, + ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) { super(settings); this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; this.threadPool = threadPool; this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion(); - this.nodesSamplerInterval = CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings); - this.pingTimeout = CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis(); - this.ignoreClusterName = CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings); + this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings); + this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis(); + this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings); if (logger.isDebugEnabled()) { logger.debug("node_sampler_interval[{}]", nodesSamplerInterval); } - if (CLIENT_TRANSPORT_SNIFF.get(this.settings)) { + if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) { this.nodesSampler = new SniffNodesSampler(); } else { this.nodesSampler = new SimpleNodeSampler(); } + this.hostFailureListener = hostFailureListener; this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); } @@ -224,13 +215,17 @@ public class TransportClientNodesService extends AbstractComponent implements Cl } ensureNodesAreAvailable(nodes); int index = getNodeNumber(); - RetryListener retryListener = new RetryListener<>(callback, listener, nodes, index); - DiscoveryNode node = nodes.get((index) % nodes.size()); + RetryListener retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener); + DiscoveryNode node = retryListener.getNode(0); try { callback.doWithNode(node, retryListener); } catch (Exception e) { - //this exception can't come from the TransportService as it doesn't throw exception at all - listener.onFailure(e); + try { + //this exception can't come from the TransportService as it doesn't throw exception at all + listener.onFailure(e); + } finally { + retryListener.maybeNodeFailed(node, e); + } } } @@ -239,15 +234,17 @@ public class TransportClientNodesService extends AbstractComponent implements Cl private final ActionListener listener; private final List nodes; private final int index; + private final TransportClient.HostFailureListener hostFailureListener; private volatile int i; public RetryListener(NodeListenerCallback callback, ActionListener listener, - List nodes, int index) { + List nodes, int index, TransportClient.HostFailureListener hostFailureListener) { this.callback = callback; this.listener = listener; this.nodes = nodes; this.index = index; + this.hostFailureListener = hostFailureListener; } @Override @@ -257,13 +254,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl @Override public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { + Throwable throwable = ExceptionsHelper.unwrapCause(e); + if (throwable instanceof ConnectTransportException) { + maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable); int i = ++this.i; if (i >= nodes.size()) { listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e)); } else { try { - callback.doWithNode(nodes.get((index + i) % nodes.size()), this); + callback.doWithNode(getNode(i), this); } catch(final Exception inner) { inner.addSuppressed(e); // this exception can't come from the TransportService as it doesn't throw exceptions at all @@ -275,7 +274,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl } } + final DiscoveryNode getNode(int i) { + return nodes.get((index + i) % nodes.size()); + } + final void maybeNodeFailed(DiscoveryNode node, Exception ex) { + if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) { + hostFailureListener.onNodeDisconnected(node, ex); + } + } } @Override @@ -377,6 +384,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl logger.debug( (Supplier) () -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e); + hostFailureListener.onNodeDisconnected(listedNode, e); newFilteredNodes.add(listedNode); continue; } @@ -411,6 +419,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl logger.info( (Supplier) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); transportService.disconnectFromNode(listedNode); + hostFailureListener.onNodeDisconnected(listedNode, e); } } @@ -489,6 +498,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl "failed to get local cluster state for {}, disconnecting...", listedNode), e); transportService.disconnectFromNode(listedNode); latch.countDown(); + hostFailureListener.onNodeDisconnected(listedNode, e); } }); } catch (Exception e) { @@ -497,6 +507,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl "failed to get local cluster state info for {}, disconnecting...", listedNode), e); transportService.disconnectFromNode(listedNode); latch.countDown(); + hostFailureListener.onNodeDisconnected(listedNode, e); } } }); @@ -531,4 +542,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl void doWithNode(DiscoveryNode node, ActionListener listener); } + + // pkg private for testing + void doSample() { + nodesSampler.doSample(); + } } diff --git a/core/src/main/java/org/elasticsearch/client/transport/support/TransportProxyClient.java b/core/src/main/java/org/elasticsearch/client/transport/TransportProxyClient.java similarity index 79% rename from core/src/main/java/org/elasticsearch/client/transport/support/TransportProxyClient.java rename to core/src/main/java/org/elasticsearch/client/transport/TransportProxyClient.java index 31af25a494a..5436bef172a 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/support/TransportProxyClient.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportProxyClient.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.client.transport.support; +package org.elasticsearch.client.transport; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -26,9 +26,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.TransportActionNodeProxy; -import org.elasticsearch.client.transport.TransportClientNodesService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.transport.TransportService; @@ -38,12 +35,12 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; -public class TransportProxyClient { +final class TransportProxyClient { private final TransportClientNodesService nodesService; private final Map proxies; - public TransportProxyClient(Settings settings, TransportService transportService, + TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, List actions) { this.nodesService = nodesService; Map proxies = new HashMap<>(); @@ -55,7 +52,9 @@ public class TransportProxyClient { this.proxies = unmodifiableMap(proxies); } - public > void execute(final Action action, final Request request, ActionListener listener) { + public > void execute(final Action action, + final Request request, ActionListener listener) { final TransportActionNodeProxy proxy = proxies.get(action); nodesService.execute((n, l) -> proxy.execute(n, request, l), listener); } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index c842c57daec..7b6f2b55aa9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -143,10 +143,12 @@ public class ClusterState implements ToXContent, Diffable { private volatile RoutingNodes routingNodes; public ClusterState(long version, String stateUUID, ClusterState state) { - this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(), false); + this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(), + false); } - public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, boolean wasReadFromDiff) { + public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, + DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, boolean wasReadFromDiff) { this.version = version; this.stateUUID = stateUUID; this.clusterName = clusterName; @@ -272,12 +274,14 @@ public class ClusterState implements ToXContent, Diffable { } /** - * a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other state. + * a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other + * state. *

* In essence that means that all the changes from the other cluster state are also reflected by the current one */ public boolean supersedes(ClusterState other) { - return this.nodes().getMasterNodeId() != null && this.nodes().getMasterNodeId().equals(other.nodes().getMasterNodeId()) && this.version() > other.version(); + return this.nodes().getMasterNodeId() != null && this.nodes().getMasterNodeId().equals(other.nodes().getMasterNodeId()) + && this.version() > other.version(); } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 7e926db6e78..037109bff12 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -25,7 +25,7 @@ import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.bootstrap.BootstrapSettings; import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClientNodesService; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.InternalClusterInfoService; @@ -162,11 +162,11 @@ public final class ClusterSettings extends AbstractScopedSettings { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>( Arrays.asList(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, - TransportClientNodesService.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL, // TODO these transport client settings are kind + TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL, // TODO these transport client settings are kind // of odd here and should only be valid if we are a transport client - TransportClientNodesService.CLIENT_TRANSPORT_PING_TIMEOUT, - TransportClientNodesService.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME, - TransportClientNodesService.CLIENT_TRANSPORT_SNIFF, + TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT, + TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME, + TransportClient.CLIENT_TRANSPORT_SNIFF, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, diff --git a/core/src/test/java/org/elasticsearch/client/transport/NodeDisconnectIT.java b/core/src/test/java/org/elasticsearch/client/transport/NodeDisconnectIT.java new file mode 100644 index 00000000000..b695b54c35e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/client/transport/NodeDisconnectIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.MockTransportClient; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +public class NodeDisconnectIT extends ESIntegTestCase { + + public void testNotifyOnDisconnect() throws IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + + final Set disconnectedNodes = Collections.synchronizedSet(new HashSet<>()); + try (TransportClient client = new MockTransportClient(Settings.builder() + .put("cluster.name", internalCluster().getClusterName()).build(), Collections.emptySet(), (n, e) -> disconnectedNodes.add(n))) { + for (TransportService service : internalCluster().getInstances(TransportService.class)) { + client.addTransportAddress(service.boundAddress().publishAddress()); + } + internalCluster().stopRandomDataNode(); + for (int i = 0; i < 20; i++) { // fire up requests such that we hit the node and pass it to the listener + client.admin().cluster().prepareState().get(); + } + assertEquals(1, disconnectedNodes.size()); + } + assertEquals(1, disconnectedNodes.size()); + } + + public void testNotifyOnDisconnectInSniffer() throws IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + + final Set disconnectedNodes = Collections.synchronizedSet(new HashSet<>()); + try (TransportClient client = new MockTransportClient(Settings.builder() + .put("cluster.name", internalCluster().getClusterName()).build(), Collections.emptySet(), (n, e) -> disconnectedNodes.add(n))) { + int numNodes = 0; + for (TransportService service : internalCluster().getInstances(TransportService.class)) { + numNodes++; + client.addTransportAddress(service.boundAddress().publishAddress()); + } + Set discoveryNodes = client.connectedNodes().stream().map(n -> n.getAddress()).collect(Collectors.toSet()); + assertEquals(numNodes, discoveryNodes.size()); + assertEquals(0, disconnectedNodes.size()); + internalCluster().stopRandomDataNode(); + client.getNodesService().doSample(); + assertEquals(1, disconnectedNodes.size()); + assertTrue(discoveryNodes.contains(disconnectedNodes.stream().findAny().get().getAddress())); + } + assertEquals(1, disconnectedNodes.size()); + } +} diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index fcd2d113aa7..cd6cd251b34 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -101,7 +101,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); transportClientNodesService = - new TransportClientNodesService(settings, transportService, threadPool); + new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {}); this.nodesCount = randomIntBetween(1, 10); for (int i = 0; i < nodesCount; i++) { TransportAddress transportAddress = buildNewFakeTransportAddress(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java index 8338d5e5cfc..566f8ca11e7 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java @@ -37,7 +37,11 @@ public class MockTransportClient extends TransportClient { } public MockTransportClient(Settings settings, Collection> plugins) { - super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins)); + this(settings, addMockTransportIfMissing(plugins), null); + } + + public MockTransportClient(Settings settings, Collection> plugins, HostFailureListener listener) { + super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins), listener); } private static Collection> addMockTransportIfMissing(Collection> plugins) {