Zen2: Extract JoinTaskExecutor (#32911)
Moves JoinTaskExecutor out of ZenDiscovery so that it can be reused for Zen2. Also ensures that tasks to JoinTaskExecutor have a proper identity, so that multiple tasks for the same node can coexist.
This commit is contained in:
parent
6d9e7c5cec
commit
a3bb85eeaf
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.coordination;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecutor.Task> {
|
||||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public static class Task {
|
||||
|
||||
private final DiscoveryNode node;
|
||||
private final String reason;
|
||||
|
||||
public Task(DiscoveryNode node, String reason) {
|
||||
this.node = node;
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
public DiscoveryNode node() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public String reason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return node != null ? node + " " + reason : reason;
|
||||
}
|
||||
}
|
||||
|
||||
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
|
||||
this.allocationService = allocationService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> joiningNodes) throws Exception {
|
||||
final ClusterTasksResult.Builder<Task> results = ClusterTasksResult.builder();
|
||||
|
||||
final DiscoveryNodes currentNodes = currentState.nodes();
|
||||
boolean nodesChanged = false;
|
||||
ClusterState.Builder newState;
|
||||
|
||||
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
|
||||
return results.successes(joiningNodes).build(currentState);
|
||||
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
|
||||
assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
|
||||
// use these joins to try and become the master.
|
||||
// Note that we don't have to do any validation of the amount of joining nodes - the commit
|
||||
// during the cluster state publishing guarantees that we have enough
|
||||
newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
|
||||
nodesChanged = true;
|
||||
} else if (currentNodes.isLocalNodeElectedMaster() == false) {
|
||||
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
|
||||
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
|
||||
} else {
|
||||
newState = ClusterState.builder(currentState);
|
||||
}
|
||||
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
|
||||
|
||||
assert nodesBuilder.isLocalNodeElectedMaster();
|
||||
|
||||
Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
|
||||
Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
|
||||
// we only enforce major version transitions on a fully formed clusters
|
||||
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
|
||||
// processing any joins
|
||||
for (final Task joinTask : joiningNodes) {
|
||||
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
|
||||
// noop
|
||||
} else if (currentNodes.nodeExists(joinTask.node())) {
|
||||
logger.debug("received a join request for an existing node [{}]", joinTask.node());
|
||||
} else {
|
||||
final DiscoveryNode node = joinTask.node();
|
||||
try {
|
||||
if (enforceMajorVersion) {
|
||||
ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
|
||||
}
|
||||
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
|
||||
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
|
||||
// we have to reject nodes that don't support all indices we have in this cluster
|
||||
ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
|
||||
nodesBuilder.add(node);
|
||||
nodesChanged = true;
|
||||
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
|
||||
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
|
||||
} catch (IllegalArgumentException | IllegalStateException e) {
|
||||
results.failure(joinTask, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
results.success(joinTask);
|
||||
}
|
||||
if (nodesChanged) {
|
||||
newState.nodes(nodesBuilder);
|
||||
return results.build(allocationService.reroute(newState.build(), "node_join"));
|
||||
} else {
|
||||
// we must return a new cluster state instance to force publishing. This is important
|
||||
// for the joining node to finalize its join and set us as a master
|
||||
return results.build(newState.build());
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<Task> joiningNodes) {
|
||||
assert currentState.nodes().getMasterNodeId() == null : currentState;
|
||||
DiscoveryNodes currentNodes = currentState.nodes();
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
|
||||
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
|
||||
|
||||
for (final Task joinTask : joiningNodes) {
|
||||
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
|
||||
// noop
|
||||
} else {
|
||||
final DiscoveryNode joiningNode = joinTask.node();
|
||||
final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
|
||||
if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) {
|
||||
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode);
|
||||
nodesBuilder.remove(nodeWithSameId.getId());
|
||||
}
|
||||
final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress());
|
||||
if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) {
|
||||
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress,
|
||||
joiningNode);
|
||||
nodesBuilder.remove(nodeWithSameAddress.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// now trim any left over dead nodes - either left there when the previous master stepped down
|
||||
// or removed by us above
|
||||
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
|
||||
.blocks(currentState.blocks())
|
||||
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
|
||||
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
|
||||
"removed dead nodes on election"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
// we validate that we are allowed to change the cluster state during cluster state processing
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* a task indicated that the current node should become master, if no current master is known
|
||||
*/
|
||||
public static final Task BECOME_MASTER_TASK = new Task(null, "_BECOME_MASTER_TASK_");
|
||||
|
||||
/**
|
||||
* a task that is used to signal the election is stopped and we should process pending joins.
|
||||
* it may be use in combination with {@link JoinTaskExecutor#BECOME_MASTER_TASK}
|
||||
*/
|
||||
public static final Task FINISH_ELECTION_TASK = new Task(null, "_FINISH_ELECTION_");
|
||||
|
||||
/**
|
||||
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
|
||||
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
|
||||
* compatibility version.
|
||||
* @see Version#minimumIndexCompatibilityVersion()
|
||||
* @throws IllegalStateException if any index is incompatible with the given version
|
||||
*/
|
||||
public static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) {
|
||||
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
|
||||
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
|
||||
// closed or not we can't read mappings of these indices so we need to reject the join...
|
||||
for (IndexMetaData idxMetaData : metaData) {
|
||||
if (idxMetaData.getCreationVersion().after(nodeVersion)) {
|
||||
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
|
||||
+ idxMetaData.getCreationVersion() + " the node version is: " + nodeVersion);
|
||||
}
|
||||
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
|
||||
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
|
||||
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with all current nodes*/
|
||||
public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
|
||||
final Version minNodeVersion = currentNodes.getMinNodeVersion();
|
||||
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
|
||||
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with a given version range */
|
||||
public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
|
||||
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
|
||||
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." +
|
||||
"The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that the joining node's major version is equal or higher to the minClusterNodeVersion. This is needed
|
||||
* to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed
|
||||
* version mode
|
||||
**/
|
||||
public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
|
||||
final byte clusterMajor = minClusterNodeVersion.major;
|
||||
if (joiningNodeVersion.major < clusterMajor) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"All nodes in the cluster are of a higher major [" + clusterMajor + "].");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,12 +19,8 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -199,62 +195,6 @@ public class MembershipAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
|
||||
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
|
||||
* compatibility version.
|
||||
* @see Version#minimumIndexCompatibilityVersion()
|
||||
* @throws IllegalStateException if any index is incompatible with the given version
|
||||
*/
|
||||
static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) {
|
||||
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
|
||||
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
|
||||
// closed or not we can't read mappings of these indices so we need to reject the join...
|
||||
for (IndexMetaData idxMetaData : metaData) {
|
||||
if (idxMetaData.getCreationVersion().after(nodeVersion)) {
|
||||
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
|
||||
+ idxMetaData.getCreationVersion() + " the node version is: " + nodeVersion);
|
||||
}
|
||||
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
|
||||
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
|
||||
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with all current nodes*/
|
||||
static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
|
||||
final Version minNodeVersion = currentNodes.getMinNodeVersion();
|
||||
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
|
||||
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with a given version range */
|
||||
static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
|
||||
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
|
||||
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." +
|
||||
"The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that the joining node's major version is equal or higher to the minClusterNodeVersion. This is needed
|
||||
* to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed
|
||||
* version mode
|
||||
**/
|
||||
static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
|
||||
final byte clusterMajor = minClusterNodeVersion.major;
|
||||
if (joiningNodeVersion.major < clusterMajor) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"All nodes in the cluster are of a higher major [" + clusterMajor + "].");
|
||||
}
|
||||
}
|
||||
|
||||
public static class LeaveRequest extends TransportRequest {
|
||||
|
||||
private DiscoveryNode node;
|
||||
|
|
|
@ -21,24 +21,19 @@ package org.elasticsearch.discovery.zen;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -49,8 +44,6 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
/**
|
||||
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
|
||||
* are directly added to the cluster state or are accumulated during master election.
|
||||
|
@ -69,7 +62,12 @@ public class NodeJoinController extends AbstractComponent {
|
|||
Settings settings) {
|
||||
super(settings);
|
||||
this.masterService = masterService;
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, electMaster, logger);
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent event) {
|
||||
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -176,7 +174,7 @@ public class NodeJoinController extends AbstractComponent {
|
|||
checkPendingJoinsAndElectIfNeeded();
|
||||
} else {
|
||||
masterService.submitStateUpdateTask("zen-disco-node-join",
|
||||
node, ClusterStateTaskConfig.build(Priority.URGENT),
|
||||
new JoinTaskExecutor.Task(node, "no election context"), ClusterStateTaskConfig.build(Priority.URGENT),
|
||||
joinTaskExecutor, new JoinTaskListener(callback, logger));
|
||||
}
|
||||
}
|
||||
|
@ -250,9 +248,10 @@ public class NodeJoinController extends AbstractComponent {
|
|||
return hasEnough;
|
||||
}
|
||||
|
||||
private Map<DiscoveryNode, ClusterStateTaskListener> getPendingAsTasks() {
|
||||
Map<DiscoveryNode, ClusterStateTaskListener> tasks = new HashMap<>();
|
||||
joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger)));
|
||||
private Map<JoinTaskExecutor.Task, ClusterStateTaskListener> getPendingAsTasks(String reason) {
|
||||
Map<JoinTaskExecutor.Task, ClusterStateTaskListener> tasks = new HashMap<>();
|
||||
joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(
|
||||
new JoinTaskExecutor.Task(e.getKey(), reason), new JoinTaskListener(e.getValue(), logger)));
|
||||
return tasks;
|
||||
}
|
||||
|
||||
|
@ -273,19 +272,20 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
innerClose();
|
||||
|
||||
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
|
||||
Map<JoinTaskExecutor.Task, ClusterStateTaskListener> tasks = getPendingAsTasks("become master");
|
||||
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
|
||||
|
||||
tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
|
||||
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
|
||||
// noop listener, the election finished listener determines result
|
||||
tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {});
|
||||
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
|
||||
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
|
||||
}
|
||||
|
||||
public synchronized void closeAndProcessPending(String reason) {
|
||||
innerClose();
|
||||
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
|
||||
Map<JoinTaskExecutor.Task, ClusterStateTaskListener> tasks = getPendingAsTasks(reason);
|
||||
final String source = "zen-disco-election-stop [" + reason + "]";
|
||||
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
|
||||
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
|
||||
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
|
||||
}
|
||||
|
||||
|
@ -377,152 +377,4 @@ public class NodeJoinController extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* a task indicated that the current node should become master, if no current master is known
|
||||
*/
|
||||
public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_",
|
||||
new TransportAddress(TransportAddress.META_ADDRESS, 0),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return ""; // this is not really task , so don't log anything about it...
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* a task that is used to signal the election is stopped and we should process pending joins.
|
||||
* it may be use in combination with {@link #BECOME_MASTER_TASK}
|
||||
*/
|
||||
public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
|
||||
new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return ""; // this is not really task , so don't log anything about it...
|
||||
}
|
||||
};
|
||||
|
||||
// visible for testing
|
||||
public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
|
||||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final ElectMasterService electMasterService;
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public JoinTaskExecutor(AllocationService allocationService, ElectMasterService electMasterService, Logger logger) {
|
||||
this.allocationService = allocationService;
|
||||
this.electMasterService = electMasterService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
|
||||
final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();
|
||||
|
||||
final DiscoveryNodes currentNodes = currentState.nodes();
|
||||
boolean nodesChanged = false;
|
||||
ClusterState.Builder newState;
|
||||
|
||||
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
|
||||
return results.successes(joiningNodes).build(currentState);
|
||||
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
|
||||
assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
|
||||
// use these joins to try and become the master.
|
||||
// Note that we don't have to do any validation of the amount of joining nodes - the commit
|
||||
// during the cluster state publishing guarantees that we have enough
|
||||
newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
|
||||
nodesChanged = true;
|
||||
} else if (currentNodes.isLocalNodeElectedMaster() == false) {
|
||||
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
|
||||
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
|
||||
} else {
|
||||
newState = ClusterState.builder(currentState);
|
||||
}
|
||||
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
|
||||
|
||||
assert nodesBuilder.isLocalNodeElectedMaster();
|
||||
|
||||
Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
|
||||
Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
|
||||
// we only enforce major version transitions on a fully formed clusters
|
||||
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
|
||||
// processing any joins
|
||||
for (final DiscoveryNode node : joiningNodes) {
|
||||
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
|
||||
// noop
|
||||
} else if (currentNodes.nodeExists(node)) {
|
||||
logger.debug("received a join request for an existing node [{}]", node);
|
||||
} else {
|
||||
try {
|
||||
if (enforceMajorVersion) {
|
||||
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
|
||||
}
|
||||
MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
|
||||
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
|
||||
// we have to reject nodes that don't support all indices we have in this cluster
|
||||
MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
|
||||
nodesBuilder.add(node);
|
||||
nodesChanged = true;
|
||||
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
|
||||
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
|
||||
} catch (IllegalArgumentException | IllegalStateException e) {
|
||||
results.failure(node, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
results.success(node);
|
||||
}
|
||||
if (nodesChanged) {
|
||||
newState.nodes(nodesBuilder);
|
||||
return results.build(allocationService.reroute(newState.build(), "node_join"));
|
||||
} else {
|
||||
// we must return a new cluster state instance to force publishing. This is important
|
||||
// for the joining node to finalize its join and set us as a master
|
||||
return results.build(newState.build());
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<DiscoveryNode> joiningNodes) {
|
||||
assert currentState.nodes().getMasterNodeId() == null : currentState;
|
||||
DiscoveryNodes currentNodes = currentState.nodes();
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
|
||||
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
|
||||
|
||||
for (final DiscoveryNode joiningNode : joiningNodes) {
|
||||
final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
|
||||
if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) {
|
||||
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode);
|
||||
nodesBuilder.remove(nodeWithSameId.getId());
|
||||
}
|
||||
final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress());
|
||||
if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) {
|
||||
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress,
|
||||
joiningNode);
|
||||
nodesBuilder.remove(nodeWithSameAddress.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// now trim any left over dead nodes - either left there when the previous master stepped down
|
||||
// or removed by us above
|
||||
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
|
||||
.blocks(currentState.blocks())
|
||||
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
|
||||
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
|
||||
"removed dead nodes on election"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
// we validate that we are allowed to change the cluster state during cluster state processing
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent event) {
|
||||
electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
|||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
|
@ -231,8 +232,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
|
||||
Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
|
||||
validators.add((node, state) -> {
|
||||
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
|
||||
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
|
||||
JoinTaskExecutor.ensureNodesCompatibility(node.getVersion(), state.getNodes());
|
||||
JoinTaskExecutor.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
|
||||
});
|
||||
validators.addAll(onJoinValidators);
|
||||
return Collections.unmodifiableCollection(validators);
|
||||
|
@ -858,7 +859,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
// to ensure we fail as fast as possible.
|
||||
onJoinValidators.stream().forEach(a -> a.accept(node, state));
|
||||
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
|
||||
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
|
||||
JoinTaskExecutor.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
|
||||
}
|
||||
// try and connect to the node, if it fails, we can raise an exception back to the client...
|
||||
transportService.connectToNode(node);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -49,10 +50,10 @@ public class MembershipActionTests extends ESTestCase {
|
|||
.numberOfReplicas(1).build();
|
||||
metaBuilder.put(indexMetaData, false);
|
||||
MetaData metaData = metaBuilder.build();
|
||||
MembershipAction.ensureIndexCompatibility(Version.CURRENT, metaData);
|
||||
JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metaData);
|
||||
|
||||
expectThrows(IllegalStateException.class, () ->
|
||||
MembershipAction.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT),
|
||||
JoinTaskExecutor.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT),
|
||||
metaData));
|
||||
}
|
||||
|
||||
|
@ -67,7 +68,7 @@ public class MembershipActionTests extends ESTestCase {
|
|||
metaBuilder.put(indexMetaData, false);
|
||||
MetaData metaData = metaBuilder.build();
|
||||
expectThrows(IllegalStateException.class, () ->
|
||||
MembershipAction.ensureIndexCompatibility(Version.CURRENT,
|
||||
JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT,
|
||||
metaData));
|
||||
}
|
||||
|
||||
|
@ -84,9 +85,9 @@ public class MembershipActionTests extends ESTestCase {
|
|||
final Version tooLow = getPreviousVersion(maxNodeVersion.minimumCompatibilityVersion());
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(tooLow, nodes);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(tooLow, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -95,16 +96,16 @@ public class MembershipActionTests extends ESTestCase {
|
|||
Version tooHigh = incompatibleFutureVersion(minNodeVersion);
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(tooHigh, nodes);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(tooHigh, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (minNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
Version oldMajor = randomFrom(allVersions().stream().filter(v -> v.major < 6).collect(Collectors.toList()));
|
||||
expectThrows(IllegalStateException.class, () -> MembershipAction.ensureMajorVersionBarrier(oldMajor, minNodeVersion));
|
||||
expectThrows(IllegalStateException.class, () -> JoinTaskExecutor.ensureMajorVersionBarrier(oldMajor, minNodeVersion));
|
||||
}
|
||||
|
||||
final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ?
|
||||
|
@ -114,9 +115,9 @@ public class MembershipActionTests extends ESTestCase {
|
|||
final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion));
|
||||
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(justGood, nodes);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(justGood, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
|
||||
JoinTaskExecutor.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,7 +137,7 @@ public class MembershipActionTests extends ESTestCase {
|
|||
.numberOfReplicas(1).build();
|
||||
metaBuilder.put(indexMetaData, false);
|
||||
MetaData metaData = metaBuilder.build();
|
||||
MembershipAction.ensureIndexCompatibility(Version.CURRENT,
|
||||
JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT,
|
||||
metaData);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.cluster.EmptyClusterInfoService;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
|
||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||
import org.elasticsearch.cluster.metadata.AliasValidator;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -73,7 +74,6 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.NodeJoinController;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
|
@ -121,7 +121,7 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
private final TransportCreateIndexAction transportCreateIndexAction;
|
||||
|
||||
private final ZenDiscovery.NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
private final NodeJoinController.JoinTaskExecutor joinTaskExecutor;
|
||||
private final JoinTaskExecutor joinTaskExecutor;
|
||||
|
||||
public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) {
|
||||
super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build());
|
||||
|
@ -201,7 +201,7 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
ElectMasterService electMasterService = new ElectMasterService(settings);
|
||||
nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService,
|
||||
s -> { throw new AssertionError("rejoin not implemented"); }, logger);
|
||||
joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger);
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, logger);
|
||||
}
|
||||
|
||||
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
|
||||
|
@ -229,14 +229,16 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
}
|
||||
|
||||
public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
|
||||
return runTasks(joinTaskExecutor, clusterState, nodes);
|
||||
return runTasks(joinTaskExecutor, clusterState, nodes.stream().map(node -> new JoinTaskExecutor.Task(node, "dummy reason"))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
|
||||
List<DiscoveryNode> joinNodes = new ArrayList<>();
|
||||
joinNodes.add(NodeJoinController.BECOME_MASTER_TASK);
|
||||
joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK);
|
||||
joinNodes.addAll(nodes);
|
||||
List<JoinTaskExecutor.Task> joinNodes = new ArrayList<>();
|
||||
joinNodes.add(JoinTaskExecutor.BECOME_MASTER_TASK);
|
||||
joinNodes.add(JoinTaskExecutor.FINISH_ELECTION_TASK);
|
||||
joinNodes.addAll(nodes.stream().map(node -> new JoinTaskExecutor.Task(node, "dummy reason"))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
return runTasks(joinTaskExecutor, clusterState, joinNodes);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue