Local Discovery - don't create a local DiscoNode, but use the one from cluster service

Long ago (#7834) the owner ship of the local disco node was centralized to the cluster service. LocalDiscovery is still created it's own disco node, which is not used by the cluster service and thus creating confusion (two nodes same name but different ids).

This commit also removes and optimization where when joining a new master we would first copy the master's metadata and only then pull in the rest of the cluster state (and it's nodes).

Closes #16317
This commit is contained in:
Boaz Leskes 2016-01-29 16:20:39 +01:00
parent df80e8f215
commit 856883fcc5
1 changed files with 16 additions and 51 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery.local;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@ -29,7 +28,6 @@ import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -44,12 +42,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Queue;
@ -67,17 +63,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryNodeService discoveryNodeService;
private RoutingService routingService;
private final ClusterName clusterName;
private final Version version;
private final DiscoverySettings discoverySettings;
private DiscoveryNode localNode;
private volatile boolean master = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@ -89,14 +80,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private volatile ClusterState lastProcessedClusterState;
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
public LocalDiscovery(Settings settings, ClusterName clusterName, ClusterService clusterService,
DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
}
@ -119,8 +107,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);
@ -147,7 +133,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
@ -166,30 +152,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
});
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id());
return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ClusterStateUpdateTask() {
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
@ -199,7 +164,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
@ -254,7 +219,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
final Set<String> newMembers = new HashSet<>();
for (LocalDiscovery discovery : clusterGroup.members()) {
newMembers.add(discovery.localNode.id());
newMembers.add(discovery.localNode().id());
}
final LocalDiscovery master = firstMaster;
@ -266,7 +231,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
@ -293,7 +258,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public DiscoveryNode localNode() {
return localNode;
return clusterService.localNode();
}
@Override
@ -308,7 +273,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public String nodeDescription() {
return clusterName.value() + "/" + localNode.id();
return clusterName.value() + "/" + localNode().id();
}
@Override
@ -323,7 +288,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (localDiscovery.master) {
continue;
}
nodesToPublishTo.add(localDiscovery.localNode);
nodesToPublishTo.add(localDiscovery.localNode());
}
publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
@ -359,7 +324,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
synchronized (this) {
// we do the marshaling intentionally, to check it works well...
// check if we publsihed cluster state at least once and node was in the cluster when we published cluster state the last time
if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode.id())) {
if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode().id())) {
// both conditions are true - which means we can try sending cluster state as diffs
if (clusterStateDiffBytes == null) {
Diff diff = clusterState.diff(clusterChangedEvent.previousState());
@ -369,7 +334,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
try {
newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState);
logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode.getName());
logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName());
} catch (IncompatibleClusterStateVersionException ex) {
logger.warn("incompatible cluster state version [{}] - resending complete cluster state", ex, clusterState.version());
}
@ -378,7 +343,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (clusterStateBytes == null) {
clusterStateBytes = Builder.toBytes(clusterState);
}
newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode());
}
discovery.lastProcessedClusterState = newNodeSpecificClusterState;
}
@ -423,17 +388,17 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
publishResponseHandler.onFailure(discovery.localNode, t);
publishResponseHandler.onFailure(discovery.localNode(), t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
publishResponseHandler.onResponse(discovery.localNode);
publishResponseHandler.onResponse(discovery.localNode());
}
});
} else {
publishResponseHandler.onResponse(discovery.localNode);
publishResponseHandler.onResponse(discovery.localNode());
}
}