From 856883fcc51a38f44e6d02c501eacebd088c5523 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 29 Jan 2016 16:20:39 +0100 Subject: [PATCH] Local Discovery - don't create a local DiscoNode, but use the one from cluster service Long ago (#7834) the owner ship of the local disco node was centralized to the cluster service. LocalDiscovery is still created it's own disco node, which is not used by the cluster service and thus creating confusion (two nodes same name but different ids). This commit also removes and optimization where when joining a new master we would first copy the master's metadata and only then pull in the rest of the cluster state (and it's nodes). Closes #16317 --- .../discovery/local/LocalDiscovery.java | 67 +++++-------------- 1 file changed, 16 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 4a3771c8e5a..03a14fe9cf8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.local; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -29,7 +28,6 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -44,12 +42,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.InitialStateDiscoveryListener; import org.elasticsearch.node.service.NodeService; -import org.elasticsearch.transport.TransportService; import java.util.HashSet; import java.util.Queue; @@ -67,17 +63,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - private final TransportService transportService; private final ClusterService clusterService; - private final DiscoveryNodeService discoveryNodeService; private RoutingService routingService; private final ClusterName clusterName; - private final Version version; private final DiscoverySettings discoverySettings; - private DiscoveryNode localNode; - private volatile boolean master = false; private final AtomicBoolean initialStateSent = new AtomicBoolean(); @@ -89,14 +80,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private volatile ClusterState lastProcessedClusterState; @Inject - public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { + public LocalDiscovery(Settings settings, ClusterName clusterName, ClusterService clusterService, + DiscoverySettings discoverySettings) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; - this.transportService = transportService; - this.discoveryNodeService = discoveryNodeService; - this.version = version; this.discoverySettings = discoverySettings; } @@ -119,8 +107,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem clusterGroups.put(clusterName, clusterGroup); } logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(), - discoveryNodeService.buildAttributes(), version); clusterGroup.members().add(this); @@ -147,7 +133,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); + nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); // remove the NO_MASTER block in this case @@ -166,30 +152,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } }); } else if (firstMaster != null) { - // update as fast as we can the local node state with the new metadata (so we create indices for example) - final ClusterState masterState = firstMaster.clusterService.state(); - clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - // make sure we have the local node id set, we might need it as a result of the new metadata - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id()); - return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); - // tell the master to send the fact that we are here final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ClusterStateUpdateTask() { + firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() { @Override public boolean runOnlyOnMaster() { return false; @@ -199,7 +164,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); + nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); return ClusterState.builder(currentState).nodes(nodesBuilder).build(); @@ -254,7 +219,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem final Set newMembers = new HashSet<>(); for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode.id()); + newMembers.add(discovery.localNode().id()); } final LocalDiscovery master = firstMaster; @@ -266,7 +231,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); + DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().id()); DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); if (delta.added()) { logger.warn("No new nodes should be created when a new discovery view is accepted"); @@ -293,7 +258,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public DiscoveryNode localNode() { - return localNode; + return clusterService.localNode(); } @Override @@ -308,7 +273,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public String nodeDescription() { - return clusterName.value() + "/" + localNode.id(); + return clusterName.value() + "/" + localNode().id(); } @Override @@ -323,7 +288,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem if (localDiscovery.master) { continue; } - nodesToPublishTo.add(localDiscovery.localNode); + nodesToPublishTo.add(localDiscovery.localNode()); } publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); } @@ -359,7 +324,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem synchronized (this) { // we do the marshaling intentionally, to check it works well... // check if we publsihed cluster state at least once and node was in the cluster when we published cluster state the last time - if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode.id())) { + if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode().id())) { // both conditions are true - which means we can try sending cluster state as diffs if (clusterStateDiffBytes == null) { Diff diff = clusterState.diff(clusterChangedEvent.previousState()); @@ -369,7 +334,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } try { newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState); - logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode.getName()); + logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName()); } catch (IncompatibleClusterStateVersionException ex) { logger.warn("incompatible cluster state version [{}] - resending complete cluster state", ex, clusterState.version()); } @@ -378,7 +343,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem if (clusterStateBytes == null) { clusterStateBytes = Builder.toBytes(clusterState); } - newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); + newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode()); } discovery.lastProcessedClusterState = newNodeSpecificClusterState; } @@ -423,17 +388,17 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); - publishResponseHandler.onFailure(discovery.localNode, t); + publishResponseHandler.onFailure(discovery.localNode(), t); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { sendInitialStateEventIfNeeded(); - publishResponseHandler.onResponse(discovery.localNode); + publishResponseHandler.onResponse(discovery.localNode()); } }); } else { - publishResponseHandler.onResponse(discovery.localNode); + publishResponseHandler.onResponse(discovery.localNode()); } }