From 986bf52d1f3a3dc38ee4a29e1c8b3f5c7ef55126 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 30 Nov 2018 18:53:08 +0100 Subject: [PATCH] [Zen2] Allow Setting a List of Bootstrap Nodes to Wait for (#35847) --- .../bootstrap/GetDiscoveredNodesRequest.java | 28 ++- .../TransportGetDiscoveredNodesAction.java | 46 ++++- .../coordination/ClusterBootstrapService.java | 9 + .../common/settings/ClusterSettings.java | 1 + ...ransportGetDiscoveredNodesActionTests.java | 194 ++++++++++++++---- .../client/transport/TransportClientIT.java | 2 +- 6 files changed, 238 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java index ffd031785cf..28611869475 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the @@ -38,6 +40,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest { @Nullable // if the request should wait indefinitely private TimeValue timeout = TimeValue.timeValueSeconds(30); + private List requiredNodes = Collections.emptyList(); + public GetDiscoveredNodesRequest() { } @@ -45,6 +49,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest { super(in); waitForNodes = in.readInt(); timeout = in.readOptionalTimeValue(); + requiredNodes = in.readList(StreamInput::readString); } /** @@ -95,6 +100,26 @@ public class GetDiscoveredNodesRequest extends ActionRequest { return timeout; } + /** + * Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes. + * This parameter gives the names or transport addresses of the expected nodes. + * + * @return list of expected nodes + */ + public List getRequiredNodes() { + return requiredNodes; + } + + /** + * Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes. + * This parameter gives the names or transport addresses of the expected nodes. + * + * @param requiredNodes list of expected nodes + */ + public void setRequiredNodes(final List requiredNodes) { + this.requiredNodes = requiredNodes; + } + @Override public ActionRequestValidationException validate() { return null; @@ -110,6 +135,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest { super.writeTo(out); out.writeInt(waitForNodes); out.writeOptionalTimeValue(timeout); + out.writeStringList(requiredNodes); } @Override @@ -117,6 +143,6 @@ public class GetDiscoveredNodesRequest extends ActionRequest { return "GetDiscoveredNodesRequest{" + "waitForNodes=" + waitForNodes + ", timeout=" + timeout + - '}'; + ", requiredNodes=" + requiredNodes + "}"; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java index c4d979300fa..7951a926c10 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java @@ -37,11 +37,14 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; +import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; @@ -93,8 +96,12 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) { - listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet)); + try { + if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) { + listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet)); + } + } catch (Exception e) { + listenableFuture.onFailure(e); } } @@ -124,4 +131,39 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction nodes) { + if (nodes.size() < request.getWaitForNodes()) { + return false; + } + + List requirements = request.getRequiredNodes(); + final Set selectedNodes = new HashSet<>(); + for (final String requirement : requirements) { + final Set matchingNodes + = nodes.stream().filter(n -> matchesRequirement(n, requirement)).collect(Collectors.toSet()); + + if (matchingNodes.isEmpty()) { + return false; + } + if (matchingNodes.size() > 1) { + throw new IllegalArgumentException("[" + requirement + "] matches " + matchingNodes); + } + + for (final DiscoveryNode matchingNode : matchingNodes) { + if (selectedNodes.add(matchingNode) == false) { + throw new IllegalArgumentException("[" + matchingNode + "] matches " + + requirements.stream().filter(r -> matchesRequirement(matchingNode, requirement)).collect(Collectors.toList())); + } + } + } + + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index 572631a8f61..093d3a2bf0e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -41,6 +41,9 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; public class ClusterBootstrapService { @@ -51,12 +54,17 @@ public class ClusterBootstrapService { public static final Setting INITIAL_MASTER_NODE_COUNT_SETTING = Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope); + public static final Setting> INITIAL_MASTER_NODES_SETTING = + Setting.listSetting("cluster.initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope); + private final int initialMasterNodeCount; + private final List initialMasterNodes; private final TransportService transportService; private volatile boolean running; public ClusterBootstrapService(Settings settings, TransportService transportService) { initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings); + initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings); this.transportService = transportService; } @@ -73,6 +81,7 @@ public class ClusterBootstrapService { final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest(); request.setWaitForNodes(initialMasterNodeCount); + request.setRequiredNodes(initialMasterNodes); request.setTimeout(null); logger.trace("sending {}", request); transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request, diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 69c63682b9d..0a6335ebc49 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -470,6 +470,7 @@ public final class ClusterSettings extends AbstractScopedSettings { LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING, Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION, TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING, + ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING, ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING, LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING ))); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index ba663e76b46..2d57f478cad 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -51,6 +51,9 @@ import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -92,8 +95,10 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { @Before public void setupTest() { clusterName = randomAlphaOfLength(10); - localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); - otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + localNode = new DiscoveryNode( + "node1", "local", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); + otherNode = new DiscoveryNode( + "node2", "other", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); final MockTransport transport = new MockTransport() { @Override @@ -220,13 +225,114 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { } } - public void testGetsDiscoveredNodes() throws InterruptedException { + public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionMet(getDiscoveredNodesRequest); + } + + public void testGetsDiscoveredNodesByAddress() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), otherNode.getAddress().toString())); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionMet(getDiscoveredNodesRequest); + } + + public void testGetsDiscoveredNodesByName() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName())); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionMet(getDiscoveredNodesRequest); + } + + public void testGetsDiscoveredNodesByIP() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + String ip = localNode.getAddress().getAddress(); + getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip)); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, '[' + ip + "] matches ["); + } + + public void testGetsDiscoveredNodesDuplicateName() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + String name = localNode.getName(); + getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(name, name)); + getDiscoveredNodesRequest.setWaitForNodes(1); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [" + name + ", " + name + ']'); + } + + public void testGetsDiscoveredNodesWithDuplicateMatchNameAndAddress() throws InterruptedException { + setupGetDiscoveredNodesAction(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), localNode.getName())); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches ["); + } + + public void testGetsDiscoveredNodesTimeoutOnMissing() throws InterruptedException { + setupGetDiscoveredNodesAction(); + + final CountDownLatch latch = new CountDownLatch(1); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), "_missing")); + getDiscoveredNodesRequest.setWaitForNodes(1); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + assertThat(exp.getRootCause(), instanceOf(ElasticsearchTimeoutException.class)); + latch.countDown(); + } + }); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException { new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action transportService.start(); transportService.acceptIncomingRequests(); coordinator.start(); coordinator.startInitialJoin(); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + final String ip = localNode.getAddress().getAddress(); + getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip)); + getDiscoveredNodesRequest.setWaitForNodes(2); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + Throwable t = exp.getRootCause(); + assertThat(t, instanceOf(IllegalArgumentException.class)); + assertThat(t.getMessage(), startsWith('[' + ip + "] matches [")); + countDownLatch.countDown(); + } + }); + + executeRequestPeersAction(); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + private void executeRequestPeersAction() { threadPool.generic().execute(() -> transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()), new TransportResponseHandler() { @@ -248,47 +354,59 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { return Names.SAME; } })); + } - { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setWaitForNodes(2); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); - countDownLatch.countDown(); - } + private void setupGetDiscoveredNodesAction() throws InterruptedException { + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); + executeRequestPeersAction(); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + assertWaitConditionMet(getDiscoveredNodesRequest); + } - { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setWaitForNodes(2); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); - countDownLatch.countDown(); - } + private void assertWaitConditionMet(GetDiscoveredNodesRequest getDiscoveredNodesRequest) throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); + countDownLatch.countDown(); + } - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + private void assertWaitConditionFailedOnDuplicate(GetDiscoveredNodesRequest getDiscoveredNodesRequest, String message) + throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + Throwable t = exp.getRootCause(); + assertThat(t, instanceOf(IllegalArgumentException.class)); + assertThat(t.getMessage(), startsWith(message)); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); } private abstract class ResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index 61a86b2665c..709af1dfe3c 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -67,7 +67,7 @@ public class TransportClientIT extends ESIntegTestCase { .put(Node.NODE_DATA_SETTING.getKey(), false) .put("cluster.name", "foobar") .put(TestZenDiscovery.USE_ZEN2.getKey(), getUseZen2()) - .put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1) + .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), "testNodeVersionIsUpdated") .build(), Arrays.asList(getTestTransportPlugin(), TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class)).start()) { TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();