Remove DiscoveryNodes#dataNodes in favour of existing DiscoveryNodes#getDataNodes

This commit is contained in:
javanna 2016-03-30 15:12:34 +02:00 committed by Luca Cavanna
parent c175e07c8a
commit 463fbe45c6
14 changed files with 20 additions and 29 deletions

View File

@ -495,7 +495,7 @@ public class TransportClientNodesService extends AbstractComponent {
newFilteredNodes.add(entry.getKey());
continue;
}
for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
newNodes.add(cursor.value);
}
}

View File

@ -92,7 +92,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, S
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
validationFailures = validation.failures();
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().dataNodes().size();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
for (String index : concreteIndices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);

View File

@ -103,7 +103,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
// wait for events from all nodes that it has been removed from their respective metadata...
int count = currentState.nodes().getSize();
// add the notifications that the store was deleted from *data* nodes
count += currentState.nodes().dataNodes().size();
count += currentState.nodes().getDataNodes().size();
final AtomicInteger counter = new AtomicInteger(count * indices.size());
// this listener will be notified once we get back a notification based on the cluster state change below.

View File

@ -83,7 +83,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
return;
}
// we will want to know this for translating "all" to a number
final int dataNodeCount = event.state().nodes().dataNodes().size();
final int dataNodeCount = event.state().nodes().getDataNodes().size();
Map<Integer, List<Index>> nrReplicasChanged = new HashMap<>();
// we need to do this each time in case it was changed by update settings

View File

@ -114,22 +114,13 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return this.nodes;
}
/**
* Get a {@link Map} of the discovered data nodes arranged by their ids
*
* @return {@link Map} of the discovered data nodes arranged by their ids
*/
public ImmutableOpenMap<String, DiscoveryNode> dataNodes() {
return this.dataNodes;
}
/**
* Get a {@link Map} of the discovered data nodes arranged by their ids
*
* @return {@link Map} of the discovered data nodes arranged by their ids
*/
public ImmutableOpenMap<String, DiscoveryNode> getDataNodes() {
return dataNodes();
return this.dataNodes;
}
/**

View File

@ -230,7 +230,7 @@ public class OperationRouting extends AbstractComponent {
}
private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) {
if (!nodes.dataNodes().keys().contains(nodeId)) {
if (!nodes.getDataNodes().keys().contains(nodeId)) {
throw new IllegalArgumentException("No data node with id[" + nodeId + "] found");
}
}

View File

@ -86,7 +86,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
Map<String, List<ShardRouting>> nodesToShards = new HashMap<>();
// fill in the nodeToShards with the "live" nodes
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
nodesToShards.put(cursor.value.getId(), new ArrayList<>());
}

View File

@ -406,7 +406,7 @@ public class AllocationService extends AbstractComponent {
*/
private void applyNewNodes(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!routingNodes.isKnown(node)) {
routingNodes.addNode(node);
@ -418,7 +418,7 @@ public class AllocationService extends AbstractComponent {
boolean changed = false;
for (RoutingNodes.RoutingNodesIterator it = allocation.routingNodes().nodes(); it.hasNext(); ) {
RoutingNode node = it.next();
if (allocation.nodes().dataNodes().containsKey(node.nodeId())) {
if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
// its a live node, continue
continue;
}

View File

@ -618,7 +618,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
// Allow allocation regardless if only a single data node is available
if (allocation.nodes().dataNodes().size() <= 1) {
if (allocation.nodes().getDataNodes().size() <= 1) {
if (logger.isTraceEnabled()) {
logger.trace("only a single data node is present, allowing allocation");
}

View File

@ -223,7 +223,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
*/
private void fillShardCacheWithDataNodes(Map<String, NodeEntry<T>> shardCache, DiscoveryNodes nodes) {
// verify that all current data nodes are there
for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.dataNodes()) {
for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getDataNodes()) {
DiscoveryNode node = cursor.value;
if (shardCache.containsKey(node.getId()) == false) {
shardCache.put(node.getId(), new NodeEntry<T>(node.getId()));

View File

@ -168,9 +168,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [{}] < recover_after_nodes [{}]",
nodes.masterAndDataNodes().size(), recoverAfterNodes);
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.dataNodes().size(), recoverAfterDataNodes);
nodes.getDataNodes().size(), recoverAfterDataNodes);
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [{}] < recover_after_master_nodes [{}]",
nodes.masterNodes().size(), recoverAfterMasterNodes);
@ -188,9 +188,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedNodes + "] nodes, but only have [" + nodes.masterAndDataNodes().size() + "]";
} else if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
} else if (expectedDataNodes != -1 && (nodes.getDataNodes().size() < expectedDataNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.dataNodes().size() + "]";
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
} else if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.masterNodes().size() + "]";

View File

@ -216,7 +216,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
* Can the shard be allocated on at least one node based on the allocation deciders.
*/
private boolean canBeAllocatedToAtLeastOneNode(ShardRouting shard, RoutingAllocation allocation) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
RoutingNode node = allocation.routingNodes().node(cursor.value.getId());
if (node == null) {
continue;

View File

@ -64,8 +64,8 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
indexingThread.start();
ClusterState initialState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode[] dataNodes = initialState.getNodes().dataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode relocationSource = initialState.getNodes().dataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId());
DiscoveryNode[] dataNodes = initialState.getNodes().getDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode relocationSource = initialState.getNodes().getDataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId());
for (int i = 0; i < RELOCATION_COUNT; i++) {
DiscoveryNode relocationTarget = randomFrom(dataNodes);
while (relocationTarget.equals(relocationSource)) {

View File

@ -427,8 +427,8 @@ public class TribeIT extends ESIntegTestCase {
@Override
public void run() {
DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().get().getState().getNodes();
assertThat(countDataNodesForTribe("t1", tribeNodes), equalTo(internalCluster().client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size()));
assertThat(countDataNodesForTribe("t2", tribeNodes), equalTo(cluster2.client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size()));
assertThat(countDataNodesForTribe("t1", tribeNodes), equalTo(internalCluster().client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size()));
assertThat(countDataNodesForTribe("t2", tribeNodes), equalTo(cluster2.client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size()));
}
});
}