From 06a50fa31ebb5e753010d863a1ca8d2066bf82c4 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 10 Nov 2016 22:28:49 -0500 Subject: [PATCH] ShardActiveResponseHandler shouldn't hold to an entire cluster state ShardActiveResponseHandler doesn't need to hold to an entire cluster state since it only needs to know the cluster state version. It seems that on overloaded systems where nodes are unresponsive holding onto a lot of different cluster states can make the situation worse. Closes #21394 --- .../indices/store/IndicesStore.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index f360af7c2f7..64f09b32223 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -191,7 +191,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } } - ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state, requests.size()); + ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state.getVersion(), + requests.size()); for (Tuple request : requests) { logger.trace("{} sending shard active check to {}", request.v2().shardId, request.v1()); transportService.sendRequest(request.v1(), ACTION_SHARD_EXISTS, request.v2(), responseHandler); @@ -202,14 +203,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final ShardId shardId; private final int expectedActiveCopies; - private final ClusterState clusterState; + private final long clusterStateVersion; private final AtomicInteger awaitingResponses; private final AtomicInteger activeCopies; - public ShardActiveResponseHandler(ShardId shardId, ClusterState clusterState, int expectedActiveCopies) { + public ShardActiveResponseHandler(ShardId shardId, long clusterStateVersion, int expectedActiveCopies) { this.shardId = shardId; this.expectedActiveCopies = expectedActiveCopies; - this.clusterState = clusterState; + this.clusterStateVersion = clusterStateVersion; this.awaitingResponses = new AtomicInteger(expectedActiveCopies); this.activeCopies = new AtomicInteger(); } @@ -251,8 +252,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } ClusterState latestClusterState = clusterService.state(); - if (clusterState.getVersion() != latestClusterState.getVersion()) { - logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); + if (clusterStateVersion != latestClusterState.getVersion()) { + logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterStateVersion); return; } @@ -264,8 +265,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe @Override public ClusterState execute(ClusterState currentState) throws Exception { - if (clusterState.getVersion() != currentState.getVersion()) { - logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); + if (clusterStateVersion != currentState.getVersion()) { + logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterStateVersion); return currentState; } try {