From 7ecb79a8e1e087fd92324ad1efb09d363b39587f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 May 2017 16:29:51 +0200 Subject: [PATCH] Remove DiscoveryNodesProvider interface (#24461) The DiscoveryNodesProvider interface provides an unnecessary abstraction and is just used in conjunction with the existing PingContextProvider interface. This commit removes it. --- .../discovery/zen/DiscoveryNodesProvider.java | 28 --- .../discovery/zen/PingContextProvider.java | 4 +- .../discovery/zen/UnicastZenPing.java | 16 +- .../discovery/zen/ZenDiscovery.java | 14 +- .../elasticsearch/discovery/zen/ZenPing.java | 2 +- .../single/SingleNodeDiscoveryIT.java | 24 +-- .../zen/PublishClusterStateActionTests.java | 3 +- .../discovery/zen/UnicastZenPingTests.java | 167 +++++------------- 8 files changed, 67 insertions(+), 191 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java deleted file mode 100644 index 247839397e0..00000000000 --- a/core/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.zen; - -import org.elasticsearch.cluster.node.DiscoveryNodes; - -public interface DiscoveryNodesProvider { - - DiscoveryNodes nodes(); - -} diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java index b705c918392..7567b69cfe4 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PingContextProvider.java @@ -20,11 +20,9 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; -public interface PingContextProvider extends DiscoveryNodesProvider { +public interface PingContextProvider { /** return the current cluster state of the node */ ClusterState clusterState(); - } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index efb537e7520..3a68b2b4cd8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -306,7 +307,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { throw new RuntimeException(e); } seedNodes.addAll(hostsProvider.buildDynamicNodes()); - final DiscoveryNodes nodes = contextProvider.nodes(); + final DiscoveryNodes nodes = contextProvider.clusterState().nodes(); // add all possible master nodes that were active in the last known cluster configuration for (ObjectCursor masterNode : nodes.getMasterNodes().values()) { seedNodes.add(masterNode.value); @@ -457,9 +458,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = pingingRound.id(); pingRequest.timeout = timeout; - DiscoveryNodes discoNodes = contextProvider.nodes(); + ClusterState lastState = contextProvider.clusterState(); - pingRequest.pingResponse = createPingResponse(discoNodes); + pingRequest.pingResponse = createPingResponse(lastState); Set nodesFromResponses = temporalResponses.stream().map(pingResponse -> { assert clusterName.equals(pingResponse.clusterName()) : @@ -476,7 +477,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { // resolve what we can via the latest cluster state final Set nodesToPing = uniqueNodesByAddress.values().stream() .map(node -> { - DiscoveryNode foundNode = discoNodes.findByAddress(node.getAddress()); + DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress()); if (foundNode == null) { return node; } else { @@ -594,7 +595,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { () -> temporalResponses.remove(request.pingResponse)); List pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses); - pingResponses.add(createPingResponse(contextProvider.nodes())); + pingResponses.add(createPingResponse(contextProvider.clusterState())); UnicastPingResponse unicastPingResponse = new UnicastPingResponse(); unicastPingResponse.id = request.id; @@ -647,8 +648,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } } - private PingResponse createPingResponse(DiscoveryNodes discoNodes) { - return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), contextProvider.clusterState()); + private PingResponse createPingResponse(ClusterState clusterState) { + DiscoveryNodes discoNodes = clusterState.nodes(); + return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterState); } static class UnicastPingResponse extends TransportResponse { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index af32afb99c9..fe94cea597d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -258,7 +258,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover masterFD.stop("zen disco stop"); nodesFD.stop(); Releasables.close(zenPing); // stop any ongoing pinging - DiscoveryNodes nodes = nodes(); + DiscoveryNodes nodes = clusterState().nodes(); if (sendLeaveRequest) { if (nodes.getMasterNode() == null) { // if we don't know who the master is, nothing to do here @@ -290,12 +290,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover IOUtils.close(masterFD, nodesFD); } - /** start of {@link PingContextProvider } implementation */ - @Override - public DiscoveryNodes nodes() { - return clusterState().nodes(); - } - @Override public ClusterState clusterState() { ClusterState clusterState = state.get(); @@ -303,8 +297,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover return clusterState; } - /** end of {@link PingContextProvider } implementation */ - @Override public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { ClusterState newState = clusterChangedEvent.state(); @@ -677,7 +669,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } if (localNodeMaster()) { removeNode(node, "zen-disco-node-left", "left"); - } else if (node.equals(nodes().getMasterNode())) { + } else if (node.equals(clusterState().nodes().getMasterNode())) { handleMasterGone(node, null, "shut_down"); } } @@ -1041,7 +1033,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } private boolean localNodeMaster() { - return nodes().isLocalNodeElectedMaster(); + return clusterState().nodes().isLocalNodeElectedMaster(); } private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index 622c4649db2..016d2a5423c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -72,7 +72,7 @@ public interface ZenPing extends Releasable { * @param clusterStateVersion the current cluster state version of that node * ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) */ - public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { + PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { this.id = idGenerator.incrementAndGet(); this.node = node; this.master = master; diff --git a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index 1b529bf5bd0..d2a520c3284 100644 --- a/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -93,26 +93,14 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase { super.finishPingingRound(pingingRound); } }; - final DiscoveryNodes nodes = - DiscoveryNodes.builder().add(pingTransport.getLocalNode()).build(); + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(nodeTransport.getLocalNode()) + .add(pingTransport.getLocalNode()) + .localNodeId(pingTransport.getLocalNode().getId()) + .build(); final ClusterName clusterName = new ClusterName(internalCluster().getClusterName()); final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build(); - unicastZenPing.start(new PingContextProvider() { - @Override - public ClusterState clusterState() { - return state; - } - - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes - .builder() - .add(nodeTransport.getLocalNode()) - .add(pingTransport.getLocalNode()) - .localNodeId(pingTransport.getLocalNode().getId()) - .build(); - } - }); + unicastZenPing.start(() -> state); closeables.push(unicastZenPing); final CompletableFuture responses = new CompletableFuture<>(); unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3)); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 3e90f414760..863bf80085b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -90,7 +90,7 @@ public class PublishClusterStateActionTests extends ESTestCase { protected ThreadPool threadPool; protected Map nodes = new HashMap<>(); - public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener, DiscoveryNodesProvider { + public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener { public final DiscoveryNode discoveryNode; public final MockTransportService service; public MockPublishAction action; @@ -142,7 +142,6 @@ public class PublishClusterStateActionTests extends ESTestCase { action.pendingStatesQueue().markAsProcessed(newClusterState); } - @Override public DiscoveryNodes nodes() { return clusterState.nodes(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 5df6bd214f3..fa53f94f42c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -189,31 +189,18 @@ public class UnicastZenPingTests extends ESTestCase { Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build(); TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); - zenPingA.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); - } - - @Override - public ClusterState clusterState() { - return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); - } - }); + ClusterState stateA = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) + .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) + .build(); + zenPingA.start(() -> stateA); closeables.push(zenPingA); TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); - zenPingB.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); - } - - @Override - public ClusterState clusterState() { - return state; - } - }); + ClusterState stateB = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) + .build(); + zenPingB.start(() -> stateB); closeables.push(zenPingB); TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, @@ -223,32 +210,18 @@ public class UnicastZenPingTests extends ESTestCase { return versionD; } }; - zenPingC.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); - } - - @Override - public ClusterState clusterState() { - return stateMismatch; - } - }); + ClusterState stateC = ClusterState.builder(stateMismatch) + .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) + .build(); + zenPingC.start(() -> stateC); closeables.push(zenPingC); TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, EMPTY_HOSTS_PROVIDER); - zenPingD.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D").build(); - } - - @Override - public ClusterState clusterState() { - return stateMismatch; - } - }); + ClusterState stateD = ClusterState.builder(stateMismatch) + .nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D")) + .build(); + zenPingD.start(() -> stateD); closeables.push(zenPingD); logger.info("ping from UZP_A"); @@ -339,45 +312,25 @@ public class UnicastZenPingTests extends ESTestCase { final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); - zenPingA.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); - } - - @Override - public ClusterState clusterState() { - return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); - } - }); + ClusterState stateA = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) + .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) + .build(); + zenPingA.start(() -> stateA); closeables.push(zenPingA); TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); - zenPingB.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); - } - - @Override - public ClusterState clusterState() { - return state; - } - }); + ClusterState stateB = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) + .build(); + zenPingB.start(() -> stateB); closeables.push(zenPingB); TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER); - zenPingC.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); - } - - @Override - public ClusterState clusterState() { - return state; - } - }); + ClusterState stateC = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) + .build(); + zenPingC.start(() -> stateC); closeables.push(zenPingC); // the presence of an unresolvable host should not prevent resolvable hosts from being pinged @@ -657,31 +610,18 @@ public class UnicastZenPingTests extends ESTestCase { }); final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); - zenPingA.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build(); - } - - @Override - public ClusterState clusterState() { - return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); - } - }); + final ClusterState stateA = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) + .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")) + .build(); + zenPingA.start(() -> stateA); closeables.push(zenPingA); TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); - zenPingB.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); - } - - @Override - public ClusterState clusterState() { - return state; - } - }); + final ClusterState stateB = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) + .build(); + zenPingB.start(() -> stateB); closeables.push(zenPingB); Collection pingResponses = zenPingA.pingAndWait().toList(); @@ -716,34 +656,19 @@ public class UnicastZenPingTests extends ESTestCase { .put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity .build(); final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); + final ClusterState stateA = ClusterState.builder(state) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) + .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build(); final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); - zenPingA.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build(); - } - - @Override - public ClusterState clusterState() { - return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); - } - }); + zenPingA.start(() -> stateA); closeables.push(zenPingA); // Node B doesn't know about A! + final ClusterState stateB = ClusterState.builder(state).nodes( + DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build(); TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); - zenPingB.start(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); - } - - @Override - public ClusterState clusterState() { - return state; - } - }); + zenPingB.start(() -> stateB); closeables.push(zenPingB); {