Don't set local node on cluster state used for node join validation (#23311)
When a node wants to join a cluster, it sends a join request to the master. The master then sends a join validation request to the node. This checks that the node can deserialize the current cluster state that exists on the master and that it can thus handle all the indices that are currently in the cluster (see #21830). The current code can trip an assertion as it does not take the cluster state as is but sets itself as the local node on the cluster state. This can result in an inconsistent DiscoveryNodes object as the local node is not yet part of the cluster state and a node with same id but different address can still exist in the cluster state. Also another node with the same address but different id can exist in the cluster state if multiple nodes are run on the same machine and ports have been swapped after node crashes/restarts.
This commit is contained in:
parent
708d11f54a
commit
0f88f21535
|
@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class MembershipAction extends AbstractComponent {
|
||||
|
||||
|
@ -63,8 +62,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
|
||||
private final MembershipListener listener;
|
||||
|
||||
public MembershipAction(Settings settings, TransportService transportService,
|
||||
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
|
||||
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.listener = listener;
|
||||
|
@ -73,7 +71,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
|
||||
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
|
||||
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
|
||||
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
|
||||
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
|
||||
new ValidateJoinRequestRequestHandler());
|
||||
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
|
||||
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
|
||||
|
@ -155,22 +153,18 @@ public class MembershipAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
static class ValidateJoinRequest extends TransportRequest {
|
||||
private final Supplier<DiscoveryNode> localNode;
|
||||
private ClusterState state;
|
||||
|
||||
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
|
||||
this.localNode = localNode;
|
||||
}
|
||||
ValidateJoinRequest() {}
|
||||
|
||||
ValidateJoinRequest(ClusterState state) {
|
||||
this.state = state;
|
||||
this.localNode = state.nodes()::getLocalNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
this.state = ClusterState.readFrom(in, localNode.get());
|
||||
this.state = ClusterState.readFrom(in, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -191,7 +191,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
new NewPendingClusterStateListener(),
|
||||
discoverySettings,
|
||||
clusterService.getClusterName());
|
||||
this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
|
||||
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
|
||||
this.joinThreadControl = new JoinThreadControl();
|
||||
|
||||
transportService.registerRequestHandler(
|
||||
|
|
Loading…
Reference in New Issue