Use ClusterService#localNode instead of checking the cluster state.

The ClusterState can hold an 'invalid' local 'DiscoveryNode' during
node startup and rare race conditions can cause NPEs if an 'invalid'
'DiscoveryNode' is serialized.

Closes #3515
This commit is contained in:
Simon Willnauer 2013-08-15 11:34:27 +02:00
parent cbdaf4950b
commit 27b973830d
14 changed files with 28 additions and 27 deletions

View File

@ -98,7 +98,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
.interval(request.request.interval)
.threadElementsSnapshotCount(request.request.snapshots);
try {
return new NodeHotThreads(clusterService.state().nodes().localNode(), hotThreads.detect());
return new NodeHotThreads(clusterService.localNode(), hotThreads.detect());
} catch (Exception e) {
throw new ElasticSearchException("failed to detect hot threads", e);
}

View File

@ -116,7 +116,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
throw new ElasticSearchIllegalStateException("Restart is disabled");
}
if (!restartRequested.compareAndSet(false, true)) {
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
return new NodesRestartResponse.NodeRestartResponse(clusterService.localNode());
}
logger.info("Restarting in [{}]", request.delay);
threadPool.schedule(request.delay, ThreadPool.Names.GENERIC, new Runnable() {
@ -146,7 +146,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
}
}
});
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
return new NodesRestartResponse.NodeRestartResponse(clusterService.localNode());
}
@Override

View File

@ -167,7 +167,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
@ -222,7 +222,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override

View File

@ -37,6 +37,7 @@ public abstract class NodeOperationResponse extends TransportResponse {
}
protected NodeOperationResponse(DiscoveryNode node) {
assert node != null;
this.node = node;
}

View File

@ -483,7 +483,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterState.nodes().localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override

View File

@ -259,7 +259,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// Set up everything, now locally create the index to see that things are ok, and apply
// create the index here (on the master) to validate it can be created, as well as adding the mapping
indicesService.createIndex(request.index, actualIndexSettings, clusterService.state().nodes().localNode().id());
indicesService.createIndex(request.index, actualIndexSettings, clusterService.localNode().id());
indexCreated = true;
// now add the mappings
IndexService indexService = indicesService.indexServiceSafe(request.index);

View File

@ -122,7 +122,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
if (indexService == null) {
// temporarily create the index so we have can parse the filter
try {
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());
} catch (Exception e) {
logger.warn("[{}] failed to temporary create in order to apply alias action", e, indexMetaData.index());
continue;

View File

@ -377,7 +377,7 @@ public class MetaDataMappingService extends AbstractComponent {
continue;
}
final IndexMetaData indexMetaData = currentState.metaData().index(index);
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());
indicesToClose.add(indexMetaData.index());
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(request.mappingType)) {

View File

@ -69,7 +69,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request"));
return;
}
AllocateDangledRequest request = new AllocateDangledRequest(clusterState.nodes().localNode(), indices);
AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices);
transportService.sendRequest(masterNode, AllocateDangledRequestHandler.ACTION, request, new TransportResponseHandler<AllocateDangledResponse>() {
@Override
public AllocateDangledResponse newInstance() {

View File

@ -127,18 +127,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
if (request.unallocated) {
IndexService indexService = indicesService.indexService(request.shardId.index().name());
if (indexService == null) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
if (!indexService.hasShard(request.shardId.id())) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
}
IndexMetaData metaData = clusterService.state().metaData().index(request.shardId.index().name());
if (metaData == null) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
try {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), listStoreMetaData(request.shardId));
return new NodeStoreFilesMetaData(clusterService.localNode(), listStoreMetaData(request.shardId));
} catch (IOException e) {
throw new ElasticSearchException("Failed to list store metadata for shard [" + request.shardId + "]", e);
}

View File

@ -114,7 +114,7 @@ public class DumpMonitorService extends AbstractComponent {
context = newHashMap();
}
if (clusterService != null) {
context.put("localNode", clusterService.state().nodes().localNode());
context.put("localNode", clusterService.localNode());
}
return context;
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
@ -49,8 +48,6 @@ public class NodeService extends AbstractComponent {
private final MonitorService monitorService;
private final ClusterService clusterService;
private final TransportService transportService;
private final IndicesService indicesService;
@ -67,16 +64,18 @@ public class NodeService extends AbstractComponent {
private final Version version;
private final Discovery disovery;
@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
ClusterService clusterService, TransportService transportService, IndicesService indicesService,
TransportService transportService, IndicesService indicesService,
PluginsService pluginService, Version version) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
this.clusterService = clusterService;
this.transportService = transportService;
this.indicesService = indicesService;
this.disovery = discovery;
discovery.setNodeService(this);
InetAddress address = NetworkUtils.getLocalAddress();
if (address != null) {
@ -116,7 +115,7 @@ public class NodeService extends AbstractComponent {
}
public NodeInfo info() {
return new NodeInfo(hostname, version, clusterService.state().nodes().localNode(), serviceAttributes,
return new NodeInfo(hostname, version, disovery.localNode(), serviceAttributes,
settings,
monitorService.osService().info(),
monitorService.processService().info(),
@ -131,7 +130,7 @@ public class NodeService extends AbstractComponent {
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean network, boolean transport, boolean http, boolean plugin) {
return new NodeInfo(hostname, version, clusterService.state().nodes().localNode(), serviceAttributes,
return new NodeInfo(hostname, version, disovery.localNode(), serviceAttributes,
settings ? this.settings : null,
os ? monitorService.osService().info() : null,
process ? monitorService.processService().info() : null,
@ -147,7 +146,7 @@ public class NodeService extends AbstractComponent {
public NodeStats stats() {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), System.currentTimeMillis(), hostname,
return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname,
indicesService.stats(true),
monitorService.osService().stats(),
monitorService.processService().stats(),
@ -163,7 +162,7 @@ public class NodeService extends AbstractComponent {
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean fs, boolean transport, boolean http) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), System.currentTimeMillis(), hostname,
return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname,
indices.anySet() ? indicesService.stats(true, indices) : null,
os ? monitorService.osService().stats() : null,
process ? monitorService.processService().stats() : null,

View File

@ -61,8 +61,9 @@ public class PublishRiverClusterStateAction extends AbstractComponent {
public void publish(RiverClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
for (final DiscoveryNode node : discoNodes) {
if (node.equals(discoNodes.localNode())) {
if (node.equals(localNode)) {
// no need to send to our self
continue;
}

View File

@ -340,13 +340,13 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
@Override
public void onMaster() {
logger.info("on master [" + clusterService.state().nodes().localNode() + "]");
logger.info("on master [" + clusterService.localNode() + "]");
master = true;
}
@Override
public void offMaster() {
logger.info("off master [" + clusterService.state().nodes().localNode() + "]");
logger.info("off master [" + clusterService.localNode() + "]");
master = false;
}