Remove DiscoveryNodes#localNode in favour of existing DiscoveryNodes#getLocalNode

This commit is contained in:
javanna 2016-03-30 15:18:10 +02:00 committed by Luca Cavanna
parent f26d05eac8
commit 7ebc094353
25 changed files with 41 additions and 50 deletions

View File

@ -228,7 +228,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(nodes.localNode()));
listener.onFailure(new NodeClosedException(nodes.getLocalNode()));
}
@Override

View File

@ -491,7 +491,7 @@ public class TransportClientNodesService extends AbstractComponent {
for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...",
entry.getValue().getState().nodes().localNode(), clusterName);
entry.getValue().getState().nodes().getLocalNode(), clusterName);
newFilteredNodes.add(entry.getKey());
continue;
}

View File

@ -747,7 +747,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
@Override
public ClusterState readFrom(StreamInput in) throws IOException {
return readFrom(in, nodes.localNode());
return readFrom(in, nodes.getLocalNode());
}
@Override

View File

@ -80,7 +80,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
final DiscoveryNodes nodes = clusterState.nodes();
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
if (nodes.getLocalNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)", index);
return;
}

View File

@ -188,22 +188,13 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return this.localNodeId;
}
/**
* Get the local node
*
* @return local node
*/
public DiscoveryNode localNode() {
return nodes.get(localNodeId);
}
/**
* Get the local node
*
* @return local node
*/
public DiscoveryNode getLocalNode() {
return localNode();
return nodes.get(localNodeId);
}
/**
@ -415,7 +406,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
sb.append("nodes: \n");
for (DiscoveryNode node : this) {
sb.append(" ").append(node);
if (node == localNode()) {
if (node == getLocalNode()) {
sb.append(", local");
}
if (node == masterNode()) {
@ -563,7 +554,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
@Override
public DiscoveryNodes readFrom(StreamInput in) throws IOException {
return readFrom(in, localNode());
return readFrom(in, getLocalNode());
}
public static Builder builder() {

View File

@ -477,7 +477,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private static List<ShardRouting> collectAttributeShards(AttributesKey key, DiscoveryNodes nodes, ArrayList<ShardRouting> from) {
final ArrayList<ShardRouting> to = new ArrayList<>();
for (final String attribute : key.attributes) {
final String localAttributeValue = nodes.localNode().getAttributes().get(attribute);
final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
if (localAttributeValue != null) {
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();

View File

@ -198,7 +198,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
@Override
synchronized protected void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterState.nodes().localNode(), "please set the local node before starting");
Objects.requireNonNull(clusterState.nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
@ -229,7 +229,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
* The local node.
*/
public DiscoveryNode localNode() {
return clusterState.getNodes().localNode();
return clusterState.getNodes().getLocalNode();
}
public OperationRouting operationRouting() {
@ -663,9 +663,9 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
ackListener.onNodeAck(newClusterState.nodes().getLocalNode(), null);
} catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().getLocalNode());
}
}

View File

@ -339,7 +339,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
if (nodeSpecificClusterState.nodes().getLocalNode() != null) {
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";

View File

@ -246,7 +246,7 @@ public class NodeJoinController extends AbstractComponent {
throw new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
}
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().getId());
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNode().getId());
// update the fact that we are the master...
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();

View File

@ -246,7 +246,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// if we don't know who the master is, nothing to do here
} else if (!nodes.localNodeMaster()) {
try {
membership.sendLeaveRequestBlocking(nodes.masterNode(), nodes.localNode(), TimeValue.timeValueSeconds(1));
membership.sendLeaveRequestBlocking(nodes.masterNode(), nodes.getLocalNode(), TimeValue.timeValueSeconds(1));
} catch (Exception e) {
logger.debug("failed to send leave request to master [{}]", e, nodes.masterNode());
}
@ -254,11 +254,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// we're master -> let other potential master we left and start a master election now rather then wait for masterFD
DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(nodes.getNodes().values(), 5);
for (DiscoveryNode possibleMaster : possibleMasters) {
if (nodes.localNode().equals(possibleMaster)) {
if (nodes.getLocalNode().equals(possibleMaster)) {
continue;
}
try {
membership.sendLeaveRequest(nodes.localNode(), possibleMaster);
membership.sendLeaveRequest(nodes.getLocalNode(), possibleMaster);
} catch (Exception e) {
logger.debug("failed to send leave request from master [{}] to possible master [{}]", e, nodes.masterNode(), possibleMaster);
}

View File

@ -171,7 +171,7 @@ public class MembershipAction extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
}
@Override

View File

@ -565,7 +565,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
private PingResponse createPingResponse(DiscoveryNodes discoNodes) {
return new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
return new PingResponse(discoNodes.getLocalNode(), discoNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
}
static class UnicastPingResponse extends TransportResponse {

View File

@ -126,7 +126,7 @@ public class PublishClusterStateAction extends AbstractComponent {
try {
nodes = clusterChangedEvent.state().nodes();
nodesToPublishTo = new HashSet<>(nodes.getSize());
DiscoveryNode localNode = nodes.localNode();
DiscoveryNode localNode = nodes.getLocalNode();
final int totalMasterNodes = nodes.getMasterNodes().size();
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode) == false) {
@ -363,7 +363,7 @@ public class PublishClusterStateAction extends AbstractComponent {
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
@ -396,7 +396,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
if (currentNodes.localNode().equals(incomingState.nodes().localNode()) == false) {
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}

View File

@ -68,7 +68,7 @@ public class GatewayAllocator extends AbstractComponent {
@Override
public void clusterChanged(ClusterChangedEvent event) {
boolean cleanCache = false;
DiscoveryNode localNode = event.state().nodes().localNode();
DiscoveryNode localNode = event.state().nodes().getLocalNode();
if (localNode != null) {
if (localNode.isMasterNode() == true && event.localNodeMaster() == false) {
cleanCache = true;

View File

@ -118,7 +118,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
Set<Index> relevantIndices = Collections.emptySet();
boolean success = true;
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().localNode().isMasterNode() || state.nodes().localNode().isDataNode()) {
if (state.nodes().getLocalNode().isMasterNode() || state.nodes().getLocalNode().isDataNode()) {
if (previousMetaData == null) {
try {
// we determine if or if not we write meta data on data only nodes by looking at the shard routing
@ -178,7 +178,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
Set<Index> relevantIndices;
if (isDataOnlyNode(state)) {
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices);
} else if (state.nodes().localNode().isMasterNode() == true) {
} else if (state.nodes().getLocalNode().isMasterNode() == true) {
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
} else {
relevantIndices = Collections.emptySet();
@ -188,7 +188,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
protected static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().localNode().isMasterNode() == false) && state.nodes().localNode().isDataNode());
return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode());
}
/**

View File

@ -535,7 +535,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IllegalStateException("Can't delete index store for [" + index.getName() + "] - it's still part of the indices service [" + localUUid + "] [" + metaData.getIndexUUID() + "]");
}
if (clusterState.metaData().hasIndex(index.getName()) && (clusterState.nodes().localNode().isMasterNode() == true)) {
if (clusterState.metaData().hasIndex(index.getName()) && (clusterState.nodes().getLocalNode().isMasterNode() == true)) {
// we do not delete the store if it is a master eligible node and the index is still in the cluster state
// because we want to keep the meta data for indices around even if no shards are left here
final IndexMetaData idxMeta = clusterState.metaData().index(index.getName());
@ -608,7 +608,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
logger.debug("{} deleted shard reason [{}]", shardId, reason);
if (clusterState.nodes().localNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
if (clusterState.nodes().getLocalNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
canDeleteIndexContents(shardId.getIndex(), indexSettings, false)) {
if (nodeEnv.findAllShardIds(shardId.getIndex()).isEmpty()) {
try {

View File

@ -617,7 +617,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.localNode());
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.getLocalNode());
indexShard.markAsRecovering("from " + sourceNode, recoveryState);
recoveryTargetService.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
@ -629,11 +629,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// recover from filesystem store
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
RecoveryState.Type.STORE,
nodes.localNode(), nodes.localNode());
nodes.getLocalNode(), nodes.getLocalNode());
indexShard.markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (indexShard.recoverFromStore(nodes.localNode())) {
if (indexShard.recoverFromStore(nodes.getLocalNode())) {
shardStateAction.shardStarted(shardRouting, "after recovery from store", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable t) {
@ -644,13 +644,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} else {
// recover from a restore
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.getLocalNode());
indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
final ShardId sId = indexShard.shardId();
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
if (indexShard.restoreFromRepository(indexShardRepository, nodes.getLocalNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(shardRouting, "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
}

View File

@ -160,7 +160,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
// check if shard is active on the current node or is getting relocated to the our node
String localNodeId = state.getNodes().localNode().getId();
String localNodeId = state.getNodes().getLocalNode().getId();
if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) {
return false;
}

View File

@ -69,7 +69,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
final DiscoveryNode localNode = discoNodes.getLocalNode();
final ObjectContainer<DiscoveryNode> masterAndDataNodes = discoNodes.getMasterAndDataNodes().values();
final List<DiscoveryNode> nodes = new ArrayList<>();

View File

@ -231,7 +231,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
TaskId taskId = banIterator.next();
if (lastDiscoveryNodes.nodeExists(taskId.getNodeId()) == false) {
logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone", taskId,
event.state().getNodes().localNode());
event.state().getNodes().getLocalNode());
banIterator.remove();
}
}

View File

@ -112,7 +112,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
if (randomIntBetween(0, 10) < 1) {
// Update cluster state via full serialization from time to time
clusterStateFromDiffs = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), previousClusterStateFromDiffs.nodes().localNode());
clusterStateFromDiffs = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), previousClusterStateFromDiffs.nodes().getLocalNode());
} else {
// Update cluster states using diffs
Diff<ClusterState> diffBeforeSerialization = clusterState.diff(previousClusterState);

View File

@ -856,7 +856,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName());
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName());
if (randomBoolean()) {
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else {

View File

@ -94,7 +94,7 @@ public class NodeJoinControllerTests extends ESTestCase {
super.setUp();
clusterService = createClusterService(threadPool);
final DiscoveryNodes initialNodes = clusterService.state().nodes();
final DiscoveryNode localNode = initialNodes.localNode();
final DiscoveryNode localNode = initialNodes.getLocalNode();
// make sure we have a master
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));

View File

@ -235,7 +235,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build();
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
DiscoveryNode node = state.nodes().localNode();
DiscoveryNode node = state.nodes().getLocalNode();
zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() {
@Override
public void onSuccess() {

View File

@ -420,7 +420,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
for (MockNode node : nodes.values()) {
assertSameState(node.clusterState, clusterState);
assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode));
assertThat(node.clusterState.nodes().getLocalNode(), equalTo(node.discoveryNode));
}
}