Zen2: Add join validation (#37203)

Adds join validation to Zen2, which prevents a node from joining a cluster when the node does not
have the right ES version or does not satisfy any other of the join validation constraints.
This commit is contained in:
Yannick Welsch 2019-01-10 12:57:50 +01:00 committed by GitHub
parent cd608848e7
commit d499233068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 234 additions and 50 deletions

View File

@ -71,12 +71,14 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -119,6 +121,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
private final ClusterApplier clusterApplier;
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators;
@Nullable
private Releasable electionScheduler;
@Nullable
@ -141,13 +144,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
ClusterApplier clusterApplier, Random random) {
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {
super(settings);
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm);
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this.persistedStateSupplier = persistedStateSupplier;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
@ -279,6 +283,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
+ lastKnownLeader + ", rejecting");
}
if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) {
// only do join validation if we have not accepted state from this master yet
onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
}
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
@ -391,6 +400,41 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
transportService.connectToNode(joinRequest.getSourceNode());
final ClusterState stateForJoinValidation = getStateForMasterService();
if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion());
}
// validate the join on the joining node, will throw a failure if it fails the validation
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
try {
processJoinRequest(joinRequest, joinCallback);
} catch (Exception e) {
joinCallback.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]",
joinRequest.getSourceNode()), e);
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
}
});
} else {
processJoinRequest(joinRequest, joinCallback);
}
}
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
final CoordinationState coordState = coordinationState.get();
@ -516,7 +560,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
// visible for testing
public DiscoveryNode getLocalNode() {
DiscoveryNode getLocalNode() {
return transportService.getLocalNode();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
@ -40,15 +41,18 @@ import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -64,6 +68,7 @@ public class JoinHelper {
private static final Logger logger = LogManager.getLogger(JoinHelper.class);
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
// the timeout for each join attempt
@ -80,7 +85,8 @@ public class JoinHelper {
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm) {
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
@ -123,9 +129,19 @@ public class JoinHelper {
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: implement join validation
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
@ -244,6 +260,29 @@ public class JoinHelper {
});
}
public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
final String actionName;
if (Coordinator.isZen1Node(node)) {
actionName = MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME;
} else {
actionName = VALIDATE_JOIN_ACTION_NAME;
}
transportService.sendRequest(node, actionName,
new MembershipAction.ValidateJoinRequest(state),
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
}
public interface JoinCallback {
void onSuccess();

View File

@ -31,7 +31,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.discovery.DiscoverySettings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -259,4 +263,15 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
"All nodes in the cluster are of a higher major [" + clusterMajor + "].");
}
}
public static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
ensureNodesCompatibility(node.getVersion(), state.getNodes());
ensureIndexCompatibility(node.getVersion(), state.getMetaData());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
}
}

View File

@ -389,7 +389,13 @@ public class PublicationTransportHandler {
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
final ClusterState incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
final ClusterState incomingState;
try {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
} catch (Exception e){
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
@ -400,10 +406,20 @@ public class PublicationTransportHandler {
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw new IncompatibleClusterStateVersionException("have no local cluster state");
} else {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
final ClusterState incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
final ClusterState incomingState;
try {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e){
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
}
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
@ -412,12 +428,6 @@ public class PublicationTransportHandler {
return response;
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
} finally {
IOUtils.close(in);
}

View File

@ -127,11 +127,11 @@ public class DiscoveryModule {
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState));
clusterSettings, hostsProvider, allocationService, joinValidators, gatewayMetaState));
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier,
Randomness.get()));
joinValidators, Randomness.get()));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier,
gatewayMetaState));
for (DiscoveryPlugin plugin : plugins) {

View File

@ -164,7 +164,7 @@ public class MembershipAction {
public ValidateJoinRequest() {}
ValidateJoinRequest(ClusterState state) {
public ValidateJoinRequest(ClusterState state) {
this.state = state;
}
@ -179,6 +179,10 @@ public class MembershipAction {
super.writeTo(out);
this.state.writeTo(out);
}
public ClusterState getState() {
return state;
}
}
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

View File

@ -73,7 +73,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
@ -163,7 +162,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState) {
super(settings);
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;
this.transportService = transportService;
@ -235,17 +234,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
}
static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
JoinTaskExecutor.ensureNodesCompatibility(node.getVersion(), state.getNodes());
JoinTaskExecutor.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
}
// protected to allow overriding in tests
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {

View File

@ -47,6 +47,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -96,7 +97,7 @@ public class TransportBootstrapClusterActionTests extends ESTestCase {
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
new MasterService("local", Settings.EMPTY, threadPool),
() -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName("cluster")).build()), r -> emptyList(),
new NoOpClusterApplier(), new Random(random().nextLong()));
new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong()));
}
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {

View File

@ -133,7 +133,7 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
ESAllocationTestCase.createAllocationService(settings),
new MasterService("local", settings, threadPool),
() -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(),
new NoOpClusterApplier(), new Random(random().nextLong()));
new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong()));
}
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {

View File

@ -63,6 +63,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -73,6 +74,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@ -974,6 +976,67 @@ public class CoordinatorTests extends ESTestCase {
// TODO reboot the leader and verify that the same block is applied when it restarts
}
public void testNodeCannotJoinIfJoinValidationFailsOnMaster() {
final Cluster cluster = new Cluster(randomIntBetween(1, 3));
cluster.runRandomly();
cluster.stabilise();
// check that if node join validation fails on master, the nodes can't join
List<ClusterNode> addedNodes = cluster.addNodes(randomIntBetween(1, 2));
final Set<DiscoveryNode> validatedNodes = new HashSet<>();
cluster.getAnyLeader().extraJoinValidators.add((discoveryNode, clusterState) -> {
validatedNodes.add(discoveryNode);
throw new IllegalArgumentException("join validation failed");
});
final long previousClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version();
cluster.runFor(10000, "failing join validation");
assertEquals(validatedNodes, addedNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet()));
assertTrue(addedNodes.stream().allMatch(ClusterNode::isCandidate));
final long newClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version();
assertEquals(previousClusterStateVersion, newClusterStateVersion);
cluster.getAnyLeader().extraJoinValidators.clear();
cluster.stabilise();
}
public void testNodeCannotJoinIfJoinValidationFailsOnJoiningNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 3));
cluster.runRandomly();
cluster.stabilise();
// check that if node join validation fails on joining node, the nodes can't join
List<ClusterNode> addedNodes = cluster.addNodes(randomIntBetween(1, 2));
final Set<DiscoveryNode> validatedNodes = new HashSet<>();
addedNodes.stream().forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> {
validatedNodes.add(discoveryNode);
throw new IllegalArgumentException("join validation failed");
}));
final long previousClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version();
cluster.runFor(10000, "failing join validation");
assertEquals(validatedNodes, addedNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet()));
assertTrue(addedNodes.stream().allMatch(ClusterNode::isCandidate));
final long newClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version();
assertEquals(previousClusterStateVersion, newClusterStateVersion);
addedNodes.stream().forEach(cn -> cn.extraJoinValidators.clear());
cluster.stabilise();
}
public void testClusterCannotFormWithFailingJoinValidation() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
// fail join validation on a majority of nodes in the initial configuration
randomValueOtherThanMany(nodes ->
cluster.initialConfiguration.hasQuorum(
nodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getId).collect(Collectors.toSet())) == false,
() -> randomSubsetOf(cluster.clusterNodes))
.forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> {
throw new IllegalArgumentException("join validation failed");
}));
cluster.bootstrapIfNecessary();
cluster.runFor(10000, "failing join validation");
assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0));
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -1061,8 +1124,8 @@ public class CoordinatorTests extends ESTestCase {
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
}
void addNodesAndStabilise(int newNodesCount) {
addNodes(newNodesCount);
List<ClusterNode> addNodesAndStabilise(int newNodesCount) {
final List<ClusterNode> addedNodes = addNodes(newNodesCount);
stabilise(
// The first pinging discovers the master
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
@ -1072,16 +1135,20 @@ public class CoordinatorTests extends ESTestCase {
// followup reconfiguration
+ newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
return addedNodes;
}
void addNodes(int newNodesCount) {
List<ClusterNode> addNodes(int newNodesCount) {
logger.info("--> adding {} nodes", newNodesCount);
final int nodeSizeAtStart = clusterNodes.size();
final List<ClusterNode> addedNodes = new ArrayList<>();
for (int i = 0; i < newNodesCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true);
clusterNodes.add(clusterNode);
addedNodes.add(clusterNode);
}
clusterNodes.addAll(addedNodes);
return addedNodes;
}
int size() {
@ -1219,15 +1286,7 @@ public class CoordinatorTests extends ESTestCase {
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
assertFalse("stabilisation requires stable storage", disruptStorage);
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
}
bootstrapIfNecessary();
runFor(stabilisationDurationMillis, "stabilising");
@ -1293,6 +1352,18 @@ public class CoordinatorTests extends ESTestCase {
leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState));
}
void bootstrapIfNecessary() {
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
}
}
void runFor(long runDurationMillis, String description) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);
@ -1421,6 +1492,7 @@ public class CoordinatorTests extends ESTestCase {
private AckedFakeThreadPoolMasterService masterService;
private TransportService transportService;
private DisruptableMockTransport mockTransport;
private List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
ClusterNode(int nodeIndex, boolean masterEligible) {
@ -1495,9 +1567,11 @@ public class CoordinatorTests extends ESTestCase {
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
Cluster.this::provideUnicastHosts, clusterApplier, onJoinValidators, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
transportService.start();
@ -1523,6 +1597,10 @@ public class CoordinatorTests extends ESTestCase {
return coordinator.getMode() == LEADER;
}
boolean isCandidate() {
return coordinator.getMode() == CANDIDATE;
}
ClusterState improveConfiguration(ClusterState currentState) {
synchronized (coordinator.mutex) {
return coordinator.improveConfiguration(currentState);

View File

@ -44,7 +44,8 @@ public class JoinHelperTests extends ESTestCase {
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); });
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList());
transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);

View File

@ -157,6 +157,8 @@ public class NodeJoinTests extends ESTestCase {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(),
destination.getVersion()));
} else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) {
handleResponse(requestId, new TransportResponse.Empty());
} else {
super.onSendRequest(requestId, action, request, destination);
}
@ -173,6 +175,7 @@ public class NodeJoinTests extends ESTestCase {
masterService,
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random);
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -388,7 +389,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler
(() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
(() -> localNode, JoinTaskExecutor.addBuiltInJoinValidators(Collections.emptyList()));
final boolean incompatible = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED,

View File

@ -81,7 +81,7 @@ public class TestZenDiscovery extends ZenDiscovery {
return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry,
allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider,
clusterApplier, new Random(Randomness.get().nextLong()));
clusterApplier, Collections.emptyList(), new Random(Randomness.get().nextLong()));
} else {
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState);