Remove DiscoveryNodesProvider interface (#24461)

The DiscoveryNodesProvider interface provides an unnecessary abstraction and is just used in conjunction with the existing PingContextProvider interface. This commit removes it.
This commit is contained in:
Yannick Welsch 2017-05-03 16:29:51 +02:00 committed by GitHub
parent 144f96eaeb
commit 7ecb79a8e1
8 changed files with 67 additions and 191 deletions

View File

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.cluster.node.DiscoveryNodes;
public interface DiscoveryNodesProvider {
DiscoveryNodes nodes();
}

View File

@ -20,11 +20,9 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
public interface PingContextProvider extends DiscoveryNodesProvider {
public interface PingContextProvider {
/** return the current cluster state of the node */
ClusterState clusterState();
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@ -306,7 +307,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
throw new RuntimeException(e);
}
seedNodes.addAll(hostsProvider.buildDynamicNodes());
final DiscoveryNodes nodes = contextProvider.nodes();
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
seedNodes.add(masterNode.value);
@ -457,9 +458,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = pingingRound.id();
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = contextProvider.nodes();
ClusterState lastState = contextProvider.clusterState();
pingRequest.pingResponse = createPingResponse(discoNodes);
pingRequest.pingResponse = createPingResponse(lastState);
Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
assert clusterName.equals(pingResponse.clusterName()) :
@ -476,7 +477,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
// resolve what we can via the latest cluster state
final Set<DiscoveryNode> nodesToPing = uniqueNodesByAddress.values().stream()
.map(node -> {
DiscoveryNode foundNode = discoNodes.findByAddress(node.getAddress());
DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress());
if (foundNode == null) {
return node;
} else {
@ -594,7 +595,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
() -> temporalResponses.remove(request.pingResponse));
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
pingResponses.add(createPingResponse(contextProvider.nodes()));
pingResponses.add(createPingResponse(contextProvider.clusterState()));
UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
unicastPingResponse.id = request.id;
@ -647,8 +648,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
}
private PingResponse createPingResponse(DiscoveryNodes discoNodes) {
return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), contextProvider.clusterState());
private PingResponse createPingResponse(ClusterState clusterState) {
DiscoveryNodes discoNodes = clusterState.nodes();
return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterState);
}
static class UnicastPingResponse extends TransportResponse {

View File

@ -258,7 +258,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
masterFD.stop("zen disco stop");
nodesFD.stop();
Releasables.close(zenPing); // stop any ongoing pinging
DiscoveryNodes nodes = nodes();
DiscoveryNodes nodes = clusterState().nodes();
if (sendLeaveRequest) {
if (nodes.getMasterNode() == null) {
// if we don't know who the master is, nothing to do here
@ -290,12 +290,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
IOUtils.close(masterFD, nodesFD);
}
/** start of {@link PingContextProvider } implementation */
@Override
public DiscoveryNodes nodes() {
return clusterState().nodes();
}
@Override
public ClusterState clusterState() {
ClusterState clusterState = state.get();
@ -303,8 +297,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
return clusterState;
}
/** end of {@link PingContextProvider } implementation */
@Override
public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
ClusterState newState = clusterChangedEvent.state();
@ -677,7 +669,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
if (localNodeMaster()) {
removeNode(node, "zen-disco-node-left", "left");
} else if (node.equals(nodes().getMasterNode())) {
} else if (node.equals(clusterState().nodes().getMasterNode())) {
handleMasterGone(node, null, "shut_down");
}
}
@ -1041,7 +1033,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
private boolean localNodeMaster() {
return nodes().isLocalNodeElectedMaster();
return clusterState().nodes().isLocalNodeElectedMaster();
}
private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) {

View File

@ -72,7 +72,7 @@ public interface ZenPing extends Releasable {
* @param clusterStateVersion the current cluster state version of that node
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
this.id = idGenerator.incrementAndGet();
this.node = node;
this.master = master;

View File

@ -93,26 +93,14 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
super.finishPingingRound(pingingRound);
}
};
final DiscoveryNodes nodes =
DiscoveryNodes.builder().add(pingTransport.getLocalNode()).build();
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
.localNodeId(pingTransport.getLocalNode().getId())
.build();
final ClusterName clusterName = new ClusterName(internalCluster().getClusterName());
final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build();
unicastZenPing.start(new PingContextProvider() {
@Override
public ClusterState clusterState() {
return state;
}
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes
.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
.localNodeId(pingTransport.getLocalNode().getId())
.build();
}
});
unicastZenPing.start(() -> state);
closeables.push(unicastZenPing);
final CompletableFuture<ZenPing.PingCollection> responses = new CompletableFuture<>();
unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3));

View File

@ -90,7 +90,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = new HashMap<>();
public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener, DiscoveryNodesProvider {
public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener {
public final DiscoveryNode discoveryNode;
public final MockTransportService service;
public MockPublishAction action;
@ -142,7 +142,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
action.pendingStatesQueue().markAsProcessed(newClusterState);
}
@Override
public DiscoveryNodes nodes() {
return clusterState.nodes();
}

View File

@ -189,31 +189,18 @@ public class UnicastZenPingTests extends ESTestCase {
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
}
@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
closeables.push(zenPingB);
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC,
@ -223,32 +210,18 @@ public class UnicastZenPingTests extends ESTestCase {
return versionD;
}
};
zenPingC.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build();
}
@Override
public ClusterState clusterState() {
return stateMismatch;
}
});
ClusterState stateC = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
zenPingC.start(() -> stateC);
closeables.push(zenPingC);
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD,
EMPTY_HOSTS_PROVIDER);
zenPingD.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D").build();
}
@Override
public ClusterState clusterState() {
return stateMismatch;
}
});
ClusterState stateD = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D"))
.build();
zenPingD.start(() -> stateD);
closeables.push(zenPingD);
logger.info("ping from UZP_A");
@ -339,45 +312,25 @@ public class UnicastZenPingTests extends ESTestCase {
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
}
@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
closeables.push(zenPingB);
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER);
zenPingC.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
ClusterState stateC = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
zenPingC.start(() -> stateC);
closeables.push(zenPingC);
// the presence of an unresolvable host should not prevent resolvable hosts from being pinged
@ -657,31 +610,18 @@ public class UnicastZenPingTests extends ESTestCase {
});
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build();
}
@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
final ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
final ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
closeables.push(zenPingB);
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
@ -716,34 +656,19 @@ public class UnicastZenPingTests extends ESTestCase {
.put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity
.build();
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
final ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build();
}
@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
zenPingA.start(() -> stateA);
closeables.push(zenPingA);
// Node B doesn't know about A!
final ClusterState stateB = ClusterState.builder(state).nodes(
DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
zenPingB.start(() -> stateB);
closeables.push(zenPingB);
{