diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java index 5a5d4a42ebb..eafbc72f8c5 100644 --- a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java @@ -51,6 +51,7 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -110,7 +111,7 @@ public class SyncedFlushService extends AbstractComponent { * Tries to flush all copies of a shard and write a sync id to it. * After a synced flush two shard copies may only contain the same sync id if they contain the same documents. * To ensure this, synced flush works in three steps: - * 1. Flush all shard copies and gather the commit points for each copy after the flush + * 1. Flush all shard copies and gather the commit ids for each copy after the flush * 2. Ensure that there are no ongoing indexing operations on the primary * 3. Perform an additional flush on each shard copy that writes the sync id * @@ -143,7 +144,8 @@ public class SyncedFlushService extends AbstractComponent { } int inflight = getInflightOpsCount(shardId, state, shardRoutingTable); - if (inflight != 1) { + assert inflight >= -1; + if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0). actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]")); } String syncId = Strings.base64UUID(); @@ -213,6 +215,7 @@ public class SyncedFlushService extends AbstractComponent { } } catch (InterruptedException e) { logger.debug("{} interrupted while waiting for in flight operation check", shardId); + Thread.currentThread().interrupt(); } final int count = result.get(); logger.trace("{} in flight operation count [{}]", shardId, count); @@ -220,26 +223,22 @@ public class SyncedFlushService extends AbstractComponent { } - void sendSyncRequests(final String syncId, List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId, final ActionListener listener) { - final CountDown countDownLatch = new CountDown(shards.size()); + void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId, final ActionListener listener) { + final CountDown countDown = new CountDown(shards.size()); final Map results = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); if (node == null) { logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new SyncedFlushResponse("unknown node")); - if (countDownLatch.countDown()) { - listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); - } + contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); continue; } final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); if (expectedCommitId == null) { logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush")); - if (countDownLatch.countDown()) { - listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); - } + contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); continue; } logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); @@ -255,18 +254,14 @@ public class SyncedFlushService extends AbstractComponent { SyncedFlushResponse existing = results.put(shard, response); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException - if (countDownLatch.countDown()) { - listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); - } + contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); } @Override public void handleException(TransportException exp) { logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard); results.put(shard, new SyncedFlushResponse(exp.getMessage())); - if (countDownLatch.countDown()) { - listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); - } + contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); } @Override @@ -278,12 +273,19 @@ public class SyncedFlushService extends AbstractComponent { } + private void contDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, ActionListener listener, CountDown countDown, Map results) { + if (countDown.countDown()) { + assert results.size() == shards.size(); + listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); + } + } + /** * send presync requests to all started copies of the given shard */ Map sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId) { final CountDownLatch countDownLatch = new CountDownLatch(shards.size()); - final Map commitIds = ConcurrentCollections.newConcurrentMap(); + final ConcurrentMap commitIds = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { logger.trace("{} sending pre-synced flush request to {}", shardId, shard); final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); @@ -300,7 +302,7 @@ public class SyncedFlushService extends AbstractComponent { @Override public void handleResponse(PreSyncedFlushResponse response) { - Engine.CommitId existing = commitIds.put(node.id(), response.commitId()); + Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId()); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException countDownLatch.countDown(); @@ -324,6 +326,7 @@ public class SyncedFlushService extends AbstractComponent { } } catch (InterruptedException e) { logger.debug("{} interrupted while waiting for presync requests", shardId); + Thread.currentThread().interrupt(); } return commitIds; @@ -488,7 +491,6 @@ public class SyncedFlushService extends AbstractComponent { final static class PreSyncedFlushRequest extends TransportRequest { private ShardId shardId; - PreSyncedFlushRequest() { }