Remove DiscoveryNodes#localNodeId in favour of existing DiscoveryNodes#getLocalNodeId

This commit is contained in:
javanna 2016-03-30 15:17:09 +02:00 committed by Luca Cavanna
parent f8b5d1f5b0
commit f26d05eac8
16 changed files with 38 additions and 47 deletions

View File

@ -449,7 +449,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
taskManager.registerChildTask(task, node.getId());
if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node);
} else {
performRemoteAction(state, primary, node);
@ -909,12 +909,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// we never execute replication operation locally as primary operation has already completed locally
// hence, we ignore any local shard for replication
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
if (nodes.getLocalNodeId().equals(shard.currentNodeId()) == false) {
onLocalShard.accept(shard);
}
// send operation to relocating shard
// local shard can be a relocation target of a primary that is in relocated state
if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
if (shard.relocating() && nodes.getLocalNodeId().equals(shard.relocatingNodeId()) == false) {
onRelocatingShard.accept(shard);
}
}

View File

@ -179,22 +179,13 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return this.masterNodeId;
}
/**
* Get the id of the local node
*
* @return id of the local node
*/
public String localNodeId() {
return this.localNodeId;
}
/**
* Get the id of the local node
*
* @return id of the local node
*/
public String getLocalNodeId() {
return localNodeId();
return this.localNodeId;
}
/**
@ -302,7 +293,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
ObjectHashSet<String> resolvedNodesIds = new ObjectHashSet<>(nodesIds.length);
for (String nodeId : nodesIds) {
if (nodeId.equals("_local")) {
String localNodeId = localNodeId();
String localNodeId = getLocalNodeId();
if (localNodeId != null) {
resolvedNodesIds.add(localNodeId);
}
@ -595,7 +586,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
public Builder(DiscoveryNodes nodes) {
this.masterNodeId = nodes.getMasterNodeId();
this.localNodeId = nodes.localNodeId();
this.localNodeId = nodes.getLocalNodeId();
this.nodes = ImmutableOpenMap.builder(nodes.getNodes());
}

View File

@ -58,12 +58,12 @@ public class OperationRouting extends AbstractComponent {
}
public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) {
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
}
public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) {
final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId);
return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
}
public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing) {
@ -75,7 +75,7 @@ public class OperationRouting extends AbstractComponent {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
if (iterator != null) {
set.add(iterator);
}

View File

@ -158,7 +158,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
}
synchronized public void setLocalNode(DiscoveryNode localNode) {
assert clusterState.nodes().localNodeId() == null : "local node is already set";
assert clusterState.nodes().getLocalNodeId() == null : "local node is already set";
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.getId());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
}

View File

@ -967,7 +967,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
// in the past (after a master failure, for example)
transportService.connectToNode(otherMaster);
transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {

View File

@ -328,7 +328,7 @@ public class MasterFaultDetection extends FaultDetection {
final DiscoveryNodes nodes = clusterService.state().nodes();
// check if we are really the same master as the one we seemed to be think we are
// this can happen if the master got "kill -9" and then another node started using the same port
if (!request.masterNodeId.equals(nodes.localNodeId())) {
if (!request.masterNodeId.equals(nodes.getLocalNodeId())) {
throw new ThisIsNotTheMasterYouAreLookingForException();
}

View File

@ -469,7 +469,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
try {
DiscoveryNodes discoveryNodes = contextProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (pingResponse.node().getId().equals(discoveryNodes.localNodeId())) {
if (pingResponse.node().getId().equals(discoveryNodes.getLocalNodeId())) {
// that's us, ignore
continue;
}

View File

@ -285,7 +285,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
public static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
}

View File

@ -193,7 +193,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void cleanFailedShards(final ClusterChangedEvent event) {
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().getLocalNodeId());
if (routingNode == null) {
failedShards.clear();
return;
@ -221,7 +221,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void applyDeletedIndices(final ClusterChangedEvent event) {
final ClusterState previousState = event.previousState();
final String localNodeId = event.state().nodes().localNodeId();
final String localNodeId = event.state().nodes().getLocalNodeId();
assert localNodeId != null;
for (Index index : event.indicesDeleted()) {
@ -259,7 +259,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void applyDeletedShards(final ClusterChangedEvent event) {
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().getLocalNodeId());
if (routingNode == null) {
return;
}
@ -315,7 +315,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
final Set<Index> hasAllocations = new HashSet<>();
final RoutingNode node = event.state().getRoutingNodes().node(event.state().nodes().localNodeId());
final RoutingNode node = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId());
// if no shards are allocated ie. if this node is a master-only node it can return nul
if (node != null) {
for (ShardRouting routing : node) {
@ -362,7 +362,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void applyNewIndices(final ClusterChangedEvent event) {
// we only create indices for shards that are allocated
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().getLocalNodeId());
if (routingNode == null) {
return;
}
@ -407,7 +407,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (requireRefresh && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(),
event.state().nodes().localNodeId())
event.state().nodes().getLocalNodeId())
);
}
} catch (Throwable t) {
@ -459,7 +459,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
RoutingTable routingTable = event.state().routingTable();
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().getLocalNodeId());
if (routingNode == null) {
failedShards.clear();

View File

@ -301,9 +301,9 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
if (!ignoreShards.contains(shard)) {
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().localNodeId()));
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId()));
} else {
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.FAILURE));
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE));
}
}
}
@ -486,7 +486,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.SUCCESS));
new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.SUCCESS));
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
@ -760,7 +760,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.FAILURE));
new ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE));
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}

View File

@ -255,12 +255,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.SUCCESS));
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
break;
default:
throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage());
@ -382,12 +382,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshotId(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshotId(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.SUCCESS));
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
} else if (localShard.getValue().stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshotId(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshotId(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));
}
}

View File

@ -376,7 +376,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final List<CapturingTransport.CapturedRequest> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
if (clusterService.state().nodes().localNodeId().equals(primaryNodeId)) {
if (clusterService.state().nodes().getLocalNodeId().equals(primaryNodeId)) {
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
assertPhase(task, "waiting_on_primary");
} else {
@ -722,7 +722,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertEquals(request.shardId, replicationRequest.shardId);
}
String localNodeId = clusterService.state().getNodes().localNodeId();
String localNodeId = clusterService.state().getNodes().getLocalNodeId();
// no request was sent to the local node
assertThat(nodesSentTo.keySet(), not(hasItem(localNodeId)));

View File

@ -134,7 +134,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
// Check nodes
assertThat(clusterStateFromDiffs.nodes().getNodes(), equalTo(clusterState.nodes().getNodes()));
assertThat(clusterStateFromDiffs.nodes().localNodeId(), equalTo(previousClusterStateFromDiffs.nodes().localNodeId()));
assertThat(clusterStateFromDiffs.nodes().getLocalNodeId(), equalTo(previousClusterStateFromDiffs.nodes().getLocalNodeId()));
assertThat(clusterStateFromDiffs.nodes().getNodes(), equalTo(clusterState.nodes().getNodes()));
for (ObjectCursor<String> node : clusterStateFromDiffs.nodes().getNodes().keys()) {
DiscoveryNode node1 = clusterState.nodes().get(node.value);

View File

@ -124,7 +124,7 @@ public class DiscoveryNodesTests extends ESTestCase {
LOCAL("_local") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
return Collections.singleton(nodes.localNodeId());
return Collections.singleton(nodes.getLocalNodeId());
}
}, ELECTED_MASTER("_master") {
@Override

View File

@ -123,7 +123,7 @@ public class ClusterServiceTests extends ESTestCase {
ClusterState state = timedClusterService.state();
final DiscoveryNodes nodes = state.nodes();
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes)
.masterNodeId(makeMaster ? nodes.localNodeId() : null);
.masterNodeId(makeMaster ? nodes.getLocalNodeId() : null);
state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.nodes(nodesBuilder).build();
setState(timedClusterService, state);

View File

@ -49,8 +49,8 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId();
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
@ -93,8 +93,8 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId();
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();