diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 4a3771c8e5a..03a14fe9cf8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.local; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -29,7 +28,6 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -44,12 +42,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.InitialStateDiscoveryListener; import org.elasticsearch.node.service.NodeService; -import org.elasticsearch.transport.TransportService; import java.util.HashSet; import java.util.Queue; @@ -67,17 +63,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - private final TransportService transportService; private final ClusterService clusterService; - private final DiscoveryNodeService discoveryNodeService; private RoutingService routingService; private final ClusterName clusterName; - private final Version version; private final DiscoverySettings discoverySettings; - private DiscoveryNode localNode; - private volatile boolean master = false; private final AtomicBoolean initialStateSent = new AtomicBoolean(); @@ -89,14 +80,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private volatile ClusterState lastProcessedClusterState; @Inject - public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { + public LocalDiscovery(Settings settings, ClusterName clusterName, ClusterService clusterService, + DiscoverySettings discoverySettings) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; - this.transportService = transportService; - this.discoveryNodeService = discoveryNodeService; - this.version = version; this.discoverySettings = discoverySettings; } @@ -119,8 +107,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem clusterGroups.put(clusterName, clusterGroup); } logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(), - discoveryNodeService.buildAttributes(), version); clusterGroup.members().add(this); @@ -147,7 +133,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); + nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); // remove the NO_MASTER block in this case @@ -166,30 +152,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } }); } else if (firstMaster != null) { - // update as fast as we can the local node state with the new metadata (so we create indices for example) - final ClusterState masterState = firstMaster.clusterService.state(); - clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - // make sure we have the local node id set, we might need it as a result of the new metadata - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id()); - return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); - // tell the master to send the fact that we are here final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ClusterStateUpdateTask() { + firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() { @Override public boolean runOnlyOnMaster() { return false; @@ -199,7 +164,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); + nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); return ClusterState.builder(currentState).nodes(nodesBuilder).build(); @@ -254,7 +219,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem final Set newMembers = new HashSet<>(); for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode.id()); + newMembers.add(discovery.localNode().id()); } final LocalDiscovery master = firstMaster; @@ -266,7 +231,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); + DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().id()); DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); if (delta.added()) { logger.warn("No new nodes should be created when a new discovery view is accepted"); @@ -293,7 +258,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public DiscoveryNode localNode() { - return localNode; + return clusterService.localNode(); } @Override @@ -308,7 +273,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public String nodeDescription() { - return clusterName.value() + "/" + localNode.id(); + return clusterName.value() + "/" + localNode().id(); } @Override @@ -323,7 +288,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem if (localDiscovery.master) { continue; } - nodesToPublishTo.add(localDiscovery.localNode); + nodesToPublishTo.add(localDiscovery.localNode()); } publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); } @@ -359,7 +324,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem synchronized (this) { // we do the marshaling intentionally, to check it works well... // check if we publsihed cluster state at least once and node was in the cluster when we published cluster state the last time - if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode.id())) { + if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode().id())) { // both conditions are true - which means we can try sending cluster state as diffs if (clusterStateDiffBytes == null) { Diff diff = clusterState.diff(clusterChangedEvent.previousState()); @@ -369,7 +334,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } try { newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState); - logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode.getName()); + logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName()); } catch (IncompatibleClusterStateVersionException ex) { logger.warn("incompatible cluster state version [{}] - resending complete cluster state", ex, clusterState.version()); } @@ -378,7 +343,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem if (clusterStateBytes == null) { clusterStateBytes = Builder.toBytes(clusterState); } - newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); + newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode()); } discovery.lastProcessedClusterState = newNodeSpecificClusterState; } @@ -423,17 +388,17 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); - publishResponseHandler.onFailure(discovery.localNode, t); + publishResponseHandler.onFailure(discovery.localNode(), t); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { sendInitialStateEventIfNeeded(); - publishResponseHandler.onResponse(discovery.localNode); + publishResponseHandler.onResponse(discovery.localNode()); } }); } else { - publishResponseHandler.onResponse(discovery.localNode); + publishResponseHandler.onResponse(discovery.localNode()); } }