[Zen2] Allow Setting a List of Bootstrap Nodes to Wait for (#35847)
This commit is contained in:
parent
48dc6c3442
commit
986bf52d1f
|
@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the
|
||||
|
@ -38,6 +40,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
|
|||
@Nullable // if the request should wait indefinitely
|
||||
private TimeValue timeout = TimeValue.timeValueSeconds(30);
|
||||
|
||||
private List<String> requiredNodes = Collections.emptyList();
|
||||
|
||||
public GetDiscoveredNodesRequest() {
|
||||
}
|
||||
|
||||
|
@ -45,6 +49,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
|
|||
super(in);
|
||||
waitForNodes = in.readInt();
|
||||
timeout = in.readOptionalTimeValue();
|
||||
requiredNodes = in.readList(StreamInput::readString);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,6 +100,26 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
|
|||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
|
||||
* This parameter gives the names or transport addresses of the expected nodes.
|
||||
*
|
||||
* @return list of expected nodes
|
||||
*/
|
||||
public List<String> getRequiredNodes() {
|
||||
return requiredNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
|
||||
* This parameter gives the names or transport addresses of the expected nodes.
|
||||
*
|
||||
* @param requiredNodes list of expected nodes
|
||||
*/
|
||||
public void setRequiredNodes(final List<String> requiredNodes) {
|
||||
this.requiredNodes = requiredNodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
|
@ -110,6 +135,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
|
|||
super.writeTo(out);
|
||||
out.writeInt(waitForNodes);
|
||||
out.writeOptionalTimeValue(timeout);
|
||||
out.writeStringList(requiredNodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,6 +143,6 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
|
|||
return "GetDiscoveredNodesRequest{" +
|
||||
"waitForNodes=" + waitForNodes +
|
||||
", timeout=" + timeout +
|
||||
'}';
|
||||
", requiredNodes=" + requiredNodes + "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,11 +37,14 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
|
||||
|
||||
|
@ -93,9 +96,13 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
|
|||
nodesSet.add(localNode);
|
||||
nodes.forEach(nodesSet::add);
|
||||
logger.trace("discovered {}", nodesSet);
|
||||
if (nodesSet.size() >= request.getWaitForNodes() && listenerNotified.compareAndSet(false, true)) {
|
||||
try {
|
||||
if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) {
|
||||
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listenableFuture.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,4 +131,39 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) {
|
||||
return discoveryNode.getName().equals(requirement)
|
||||
|| discoveryNode.getAddress().toString().equals(requirement)
|
||||
|| discoveryNode.getAddress().getAddress().equals(requirement);
|
||||
}
|
||||
|
||||
private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set<DiscoveryNode> nodes) {
|
||||
if (nodes.size() < request.getWaitForNodes()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<String> requirements = request.getRequiredNodes();
|
||||
final Set<DiscoveryNode> selectedNodes = new HashSet<>();
|
||||
for (final String requirement : requirements) {
|
||||
final Set<DiscoveryNode> matchingNodes
|
||||
= nodes.stream().filter(n -> matchesRequirement(n, requirement)).collect(Collectors.toSet());
|
||||
|
||||
if (matchingNodes.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (matchingNodes.size() > 1) {
|
||||
throw new IllegalArgumentException("[" + requirement + "] matches " + matchingNodes);
|
||||
}
|
||||
|
||||
for (final DiscoveryNode matchingNode : matchingNodes) {
|
||||
if (selectedNodes.add(matchingNode) == false) {
|
||||
throw new IllegalArgumentException("[" + matchingNode + "] matches " +
|
||||
requirements.stream().filter(r -> matchesRequirement(matchingNode, requirement)).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,9 @@ import org.elasticsearch.transport.TransportResponseHandler;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class ClusterBootstrapService {
|
||||
|
||||
|
@ -51,12 +54,17 @@ public class ClusterBootstrapService {
|
|||
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
|
||||
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);
|
||||
|
||||
public static final Setting<List<String>> INITIAL_MASTER_NODES_SETTING =
|
||||
Setting.listSetting("cluster.initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
private final int initialMasterNodeCount;
|
||||
private final List<String> initialMasterNodes;
|
||||
private final TransportService transportService;
|
||||
private volatile boolean running;
|
||||
|
||||
public ClusterBootstrapService(Settings settings, TransportService transportService) {
|
||||
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
|
||||
initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
|
@ -73,6 +81,7 @@ public class ClusterBootstrapService {
|
|||
|
||||
final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
|
||||
request.setWaitForNodes(initialMasterNodeCount);
|
||||
request.setRequiredNodes(initialMasterNodes);
|
||||
request.setTimeout(null);
|
||||
logger.trace("sending {}", request);
|
||||
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
|
||||
|
|
|
@ -470,6 +470,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING,
|
||||
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
|
||||
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING,
|
||||
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING
|
||||
)));
|
||||
|
|
|
@ -51,6 +51,9 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -92,8 +95,10 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
@Before
|
||||
public void setupTest() {
|
||||
clusterName = randomAlphaOfLength(10);
|
||||
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
localNode = new DiscoveryNode(
|
||||
"node1", "local", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
|
||||
otherNode = new DiscoveryNode(
|
||||
"node2", "other", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
|
||||
|
||||
final MockTransport transport = new MockTransport() {
|
||||
@Override
|
||||
|
@ -220,13 +225,114 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodes() throws InterruptedException {
|
||||
public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setWaitForNodes(2);
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionMet(getDiscoveredNodesRequest);
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesByAddress() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), otherNode.getAddress().toString()));
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionMet(getDiscoveredNodesRequest);
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesByName() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName()));
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionMet(getDiscoveredNodesRequest);
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesByIP() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
String ip = localNode.getAddress().getAddress();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip));
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, '[' + ip + "] matches [");
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesDuplicateName() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
String name = localNode.getName();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(name, name));
|
||||
getDiscoveredNodesRequest.setWaitForNodes(1);
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [" + name + ", " + name + ']');
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesWithDuplicateMatchNameAndAddress() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), localNode.getName()));
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [");
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesTimeoutOnMissing() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), "_missing"));
|
||||
getDiscoveredNodesRequest.setWaitForNodes(1);
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp.getRootCause(), instanceOf(ElasticsearchTimeoutException.class));
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await(10L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException {
|
||||
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
coordinator.start();
|
||||
coordinator.startInitialJoin();
|
||||
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
final String ip = localNode.getAddress().getAddress();
|
||||
getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip));
|
||||
getDiscoveredNodesRequest.setWaitForNodes(2);
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
Throwable t = exp.getRootCause();
|
||||
assertThat(t, instanceOf(IllegalArgumentException.class));
|
||||
assertThat(t.getMessage(), startsWith('[' + ip + "] matches ["));
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
executeRequestPeersAction();
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void executeRequestPeersAction() {
|
||||
threadPool.generic().execute(() ->
|
||||
transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()),
|
||||
new TransportResponseHandler<PeersResponse>() {
|
||||
|
@ -248,11 +354,24 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
return Names.SAME;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void setupGetDiscoveredNodesAction() throws InterruptedException {
|
||||
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
coordinator.start();
|
||||
coordinator.startInitialJoin();
|
||||
|
||||
executeRequestPeersAction();
|
||||
|
||||
{
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setWaitForNodes(2);
|
||||
assertWaitConditionMet(getDiscoveredNodesRequest);
|
||||
}
|
||||
|
||||
private void assertWaitConditionMet(GetDiscoveredNodesRequest getDiscoveredNodesRequest) throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
|
@ -269,27 +388,26 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
{
|
||||
private void assertWaitConditionFailedOnDuplicate(GetDiscoveredNodesRequest getDiscoveredNodesRequest, String message)
|
||||
throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setWaitForNodes(2);
|
||||
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode));
|
||||
countDownLatch.countDown();
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
throw new AssertionError("should not be called", exp);
|
||||
Throwable t = exp.getRootCause();
|
||||
assertThat(t, instanceOf(IllegalArgumentException.class));
|
||||
assertThat(t.getMessage(), startsWith(message));
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class ResponseHandler implements TransportResponseHandler<GetDiscoveredNodesResponse> {
|
||||
@Override
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TransportClientIT extends ESIntegTestCase {
|
|||
.put(Node.NODE_DATA_SETTING.getKey(), false)
|
||||
.put("cluster.name", "foobar")
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), getUseZen2())
|
||||
.put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1)
|
||||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), "testNodeVersionIsUpdated")
|
||||
.build(), Arrays.asList(getTestTransportPlugin(), TestZenDiscovery.TestPlugin.class,
|
||||
MockHttpTransport.TestPlugin.class)).start()) {
|
||||
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
|
||||
|
|
Loading…
Reference in New Issue