[Discovery] remove unneeded cluster state serialization during cluster join process

At the moment we serialize the cluster state in JoinResponse and ValidateJoinRequest. However this state is not used anywhere and can be removed to save on network overhead

Closes #6949
This commit is contained in:
Boaz Leskes 2014-07-21 23:00:07 +03:00
parent c74552e006
commit 690820dae3
2 changed files with 38 additions and 27 deletions

View File

@ -154,7 +154,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
}
@ -720,11 +720,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} else {
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);
ClusterState state = clusterService.state();
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
membership.sendValidateJoinRequestBlocking(node, joinTimeout);
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
@ -755,7 +754,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
callback.onSuccess(newState);
callback.onSuccess();
}
});
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.discovery.zen.membership;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -40,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public class MembershipAction extends AbstractComponent {
public static interface JoinCallback {
void onSuccess(ClusterState state);
void onSuccess();
void onFailure(Throwable t);
}
@ -57,11 +59,14 @@ public class MembershipAction extends AbstractComponent {
private final MembershipListener listener;
public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
private final ClusterService clusterService;
public MembershipAction(Settings settings, ClusterService clusterService, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.clusterService = clusterService;
transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler());
transportService.registerHandler(ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequestRequestHandler());
@ -83,23 +88,23 @@ public class MembershipAction extends AbstractComponent {
}
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), EmptyTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME);
}
public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException {
return transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, true), new FutureTransportResponseHandler<JoinResponse>() {
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException {
transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), new FutureTransportResponseHandler<JoinResponse>() {
@Override
public JoinResponse newInstance() {
return new JoinResponse();
}
}).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState;
}).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
/**
* Validates the join request, throwing a failure if it failed.
*/
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticsearchException {
transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), EmptyTransportResponseHandler.INSTANCE_SAME)
public void sendValidateJoinRequestBlocking(DiscoveryNode node, TimeValue timeout) throws ElasticsearchException {
transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(), EmptyTransportResponseHandler.INSTANCE_SAME)
.txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
@ -107,31 +112,39 @@ public class MembershipAction extends AbstractComponent {
DiscoveryNode node;
boolean withClusterState;
// here for backward compatibility. nodes with a version lower than 1.4.0 send this flag
boolean withClusterState = false;
private JoinRequest() {
}
private JoinRequest(DiscoveryNode node, boolean withClusterState) {
private JoinRequest(DiscoveryNode node) {
this.node = node;
this.withClusterState = withClusterState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
node = DiscoveryNode.readNode(in);
withClusterState = in.readBoolean();
if (in.getVersion().before(Version.V_1_4_0)) {
withClusterState = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
node.writeTo(out);
out.writeBoolean(withClusterState);
if (out.getVersion().before(Version.V_1_4_0)) {
// old with cluster state flag
out.writeBoolean(false);
}
}
}
// used to reply to nodes from a version older than 1.4.0 which may expect this
@Deprecated
class JoinResponse extends TransportResponse {
ClusterState clusterState;
@ -169,10 +182,11 @@ public class MembershipAction extends AbstractComponent {
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
listener.onJoin(request.node, new JoinCallback() {
@Override
public void onSuccess(ClusterState state) {
public void onSuccess() {
try {
// nodes from a version older than 1.4.0 may ask for this
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(state));
channel.sendResponse(new JoinResponse(clusterService.state()));
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -200,25 +214,23 @@ public class MembershipAction extends AbstractComponent {
class ValidateJoinRequest extends TransportRequest {
ClusterState clusterState;
ValidateJoinRequest() {
}
ValidateJoinRequest(ClusterState clusterState) {
this.clusterState = clusterState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
if (in.getVersion().before(Version.V_1_4_0)) {
ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
ClusterState.Builder.writeTo(clusterState, out);
if (out.getVersion().before(Version.V_1_4_0)) {
ClusterState.Builder.writeTo(clusterService.state(), out);
}
}
}