Discovery: Join process to better validate join request, closes #2019.

The join proces received on the master from a node joining the cluster now tries to validate by connecting to the relevant node. It should also send a message to it to double check that all is in order.
This commit is contained in:
Shay Banon 2012-06-11 23:44:27 +02:00
parent 2a80e14284
commit 9798d3e8fb
2 changed files with 58 additions and 1 deletions

View File

@ -540,6 +540,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
transportService.connectToNode(node);
state = clusterService.state();
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, pingTimeout);
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {

View File

@ -60,11 +60,13 @@ public class MembershipAction extends AbstractComponent {
this.listener = listener;
transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler());
transportService.registerHandler(ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequestRequestHandler());
transportService.registerHandler(LeaveRequestRequestHandler.ACTION, new LeaveRequestRequestHandler());
}
public void close() {
transportService.removeHandler(JoinRequestRequestHandler.ACTION);
transportService.removeHandler(ValidateJoinRequestRequestHandler.ACTION);
transportService.removeHandler(LeaveRequestRequestHandler.ACTION);
}
@ -89,6 +91,14 @@ public class MembershipAction extends AbstractComponent {
}).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState;
}
/**
* Validates the join request, throwing a failure if it failed.
*/
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticSearchException {
transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), VoidTransportResponseHandler.INSTANCE_SAME)
.txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
static class JoinRequest implements Streamable {
DiscoveryNode node;
@ -163,6 +173,49 @@ public class MembershipAction extends AbstractComponent {
}
}
class ValidateJoinRequest implements Streamable {
ClusterState clusterState;
ValidateJoinRequest() {
}
ValidateJoinRequest(ClusterState clusterState) {
this.clusterState = clusterState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(clusterState, out);
}
}
private class ValidateJoinRequestRequestHandler extends BaseTransportRequestHandler<ValidateJoinRequest> {
static final String ACTION = "discovery/zen/join/validate";
@Override
public ValidateJoinRequest newInstance() {
return new ValidateJoinRequest();
}
@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
}
private static class LeaveRequest implements Streamable {
private DiscoveryNode node;
@ -202,7 +255,7 @@ public class MembershipAction extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.SAME;
return ThreadPool.Names.GENERIC;
}
}
}