ShardActiveResponseHandler shouldn't hold to an entire cluster state

ShardActiveResponseHandler doesn't need to hold to an entire cluster state since it only needs to know the cluster state version. It seems that on overloaded systems where nodes are unresponsive holding onto a lot of different cluster states can make the situation worse.

Closes #21394
This commit is contained in:
Igor Motov 2016-11-10 22:28:49 -05:00
parent 3001b636db
commit 06a50fa31e
1 changed files with 9 additions and 8 deletions

View File

@ -191,7 +191,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
} }
ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state, requests.size()); ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state.getVersion(),
requests.size());
for (Tuple<DiscoveryNode, ShardActiveRequest> request : requests) { for (Tuple<DiscoveryNode, ShardActiveRequest> request : requests) {
logger.trace("{} sending shard active check to {}", request.v2().shardId, request.v1()); logger.trace("{} sending shard active check to {}", request.v2().shardId, request.v1());
transportService.sendRequest(request.v1(), ACTION_SHARD_EXISTS, request.v2(), responseHandler); transportService.sendRequest(request.v1(), ACTION_SHARD_EXISTS, request.v2(), responseHandler);
@ -202,14 +203,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ShardId shardId; private final ShardId shardId;
private final int expectedActiveCopies; private final int expectedActiveCopies;
private final ClusterState clusterState; private final long clusterStateVersion;
private final AtomicInteger awaitingResponses; private final AtomicInteger awaitingResponses;
private final AtomicInteger activeCopies; private final AtomicInteger activeCopies;
public ShardActiveResponseHandler(ShardId shardId, ClusterState clusterState, int expectedActiveCopies) { public ShardActiveResponseHandler(ShardId shardId, long clusterStateVersion, int expectedActiveCopies) {
this.shardId = shardId; this.shardId = shardId;
this.expectedActiveCopies = expectedActiveCopies; this.expectedActiveCopies = expectedActiveCopies;
this.clusterState = clusterState; this.clusterStateVersion = clusterStateVersion;
this.awaitingResponses = new AtomicInteger(expectedActiveCopies); this.awaitingResponses = new AtomicInteger(expectedActiveCopies);
this.activeCopies = new AtomicInteger(); this.activeCopies = new AtomicInteger();
} }
@ -251,8 +252,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
ClusterState latestClusterState = clusterService.state(); ClusterState latestClusterState = clusterService.state();
if (clusterState.getVersion() != latestClusterState.getVersion()) { if (clusterStateVersion != latestClusterState.getVersion()) {
logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterStateVersion);
return; return;
} }
@ -264,8 +265,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
if (clusterState.getVersion() != currentState.getVersion()) { if (clusterStateVersion != currentState.getVersion()) {
logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterStateVersion);
return currentState; return currentState;
} }
try { try {