diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index c97ba8ec543..b98c5a0db57 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -69,6 +69,13 @@ public final class CommitStats implements Streamable, ToXContentFragment { return id; } + /** + * A raw version of the commit id (see {@link SegmentInfos#getId()} + */ + public Engine.CommitId getRawCommitId() { + return new Engine.CommitId(Base64.getDecoder().decode(id)); + } + /** * Returns the number of documents in the in this commit */ diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index ffc93f024d1..77a5b7fc7e2 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.flush; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; @@ -44,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -199,10 +201,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL return; } - final ActionListener> commitIdsListener = new ActionListener>() { + final ActionListener> presyncListener = new ActionListener>() { @Override - public void onResponse(final Map commitIds) { - if (commitIds.isEmpty()) { + public void onResponse(final Map presyncResponses) { + if (presyncResponses.isEmpty()) { actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync")); return; } @@ -216,7 +218,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } else { // 3. now send the sync request to all the shards String syncId = UUIDs.randomBase64UUID(); - sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener); + sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); } } @@ -236,7 +238,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL }; // 1. send pre-sync flushes to all replicas - sendPreSyncRequests(activeShards, state, shardId, commitIdsListener); + sendPreSyncRequests(activeShards, state, shardId, presyncListener); } catch (Exception e) { actionListener.onFailure(e); } @@ -299,28 +301,49 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } } + private int numDocsOnPrimary(List shards, Map preSyncResponses) { + for (ShardRouting shard : shards) { + if (shard.primary()) { + final PreSyncedFlushResponse resp = preSyncResponses.get(shard.currentNodeId()); + if (resp != null) { + return resp.numDocs; + } + } + } + return PreSyncedFlushResponse.UNKNOWN_NUM_DOCS; + } - void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map expectedCommitIds, + void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map preSyncResponses, final ShardId shardId, final int totalShards, final ActionListener listener) { final CountDown countDown = new CountDown(shards.size()); final Map results = ConcurrentCollections.newConcurrentMap(); + final int numDocsOnPrimary = numDocsOnPrimary(shards, preSyncResponses); for (final ShardRouting shard : shards) { final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); if (node == null) { logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new ShardSyncedFlushResponse("unknown node")); - contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); + countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); continue; } - final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); - if (expectedCommitId == null) { + final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId()); + if (preSyncedResponse == null) { logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush")); - contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); + countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); + continue; + } + if (preSyncedResponse.numDocs != numDocsOnPrimary + && preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) { + logger.warn("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]", + shardId, syncId, shard, preSyncedResponse.numDocs, numDocsOnPrimary); + results.put(shard, new ShardSyncedFlushResponse("out of sync replica; " + + "num docs on replica [" + preSyncedResponse.numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]")); + countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); continue; } logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); - transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, expectedCommitId), + transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId), new TransportResponseHandler() { @Override public ShardSyncedFlushResponse newInstance() { @@ -332,14 +355,14 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL ShardSyncedFlushResponse existing = results.put(shard, response); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException - contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); + countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); } @Override public void handleException(TransportException exp) { logger.trace((Supplier) () -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp); results.put(shard, new ShardSyncedFlushResponse(exp.getMessage())); - contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); + countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); } @Override @@ -351,8 +374,8 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } - private void contDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, int totalShards, - ActionListener listener, CountDown countDown, Map results) { + private void countDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, int totalShards, + ActionListener listener, CountDown countDown, Map results) { if (countDown.countDown()) { assert results.size() == shards.size(); listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results)); @@ -362,16 +385,16 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL /** * send presync requests to all started copies of the given shard */ - void sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId, final ActionListener> listener) { + void sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId, final ActionListener> listener) { final CountDown countDown = new CountDown(shards.size()); - final ConcurrentMap commitIds = ConcurrentCollections.newConcurrentMap(); + final ConcurrentMap presyncResponses = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { logger.trace("{} sending pre-synced flush request to {}", shardId, shard); final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); if (node == null) { logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard); if (countDown.countDown()) { - listener.onResponse(commitIds); + listener.onResponse(presyncResponses); } continue; } @@ -383,11 +406,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL @Override public void handleResponse(PreSyncedFlushResponse response) { - Engine.CommitId existing = commitIds.putIfAbsent(node.getId(), response.commitId()); + PreSyncedFlushResponse existing = presyncResponses.putIfAbsent(node.getId(), response); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException if (countDown.countDown()) { - listener.onResponse(commitIds); + listener.onResponse(presyncResponses); } } @@ -395,7 +418,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL public void handleException(TransportException exp) { logger.trace((Supplier) () -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp); if (countDown.countDown()) { - listener.onResponse(commitIds); + listener.onResponse(presyncResponses); } } @@ -411,9 +434,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); - Engine.CommitId commitId = indexShard.flush(flushRequest); - logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId); - return new PreSyncedFlushResponse(commitId); + indexShard.flush(flushRequest); + final CommitStats commitStats = indexShard.commitStats(); + final Engine.CommitId commitId = commitStats.getRawCommitId(); + logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); + return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs()); } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { @@ -483,30 +508,45 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL * Response for first step of synced flush (flush) for one shard copy */ static final class PreSyncedFlushResponse extends TransportResponse { + static final int UNKNOWN_NUM_DOCS = -1; Engine.CommitId commitId; + int numDocs; PreSyncedFlushResponse() { } - PreSyncedFlushResponse(Engine.CommitId commitId) { + PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) { this.commitId = commitId; + this.numDocs = numDocs; } - public Engine.CommitId commitId() { + Engine.CommitId commitId() { return commitId; } + int numDocs() { + return numDocs; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); commitId = new Engine.CommitId(in); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + numDocs = in.readInt(); + } else { + numDocs = UNKNOWN_NUM_DOCS; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); commitId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeInt(numDocs); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9b6daba22ea..127f33b1365 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -602,9 +602,10 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); - engine.flush(true, true); + final Engine.CommitId commitId = engine.flush(true, true); CommitStats stats2 = engine.commitStats(); + assertThat(stats2.getRawCommitId(), equalTo(commitId)); assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), not(equalTo(stats1.getId()))); diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 91004e54d43..c56602f789e 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.indices.flush; +import org.apache.lucene.index.Term; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; @@ -35,7 +36,13 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngineTests; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; @@ -47,9 +54,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class FlushIT extends ESIntegTestCase { public void testWaitIfOngoing() throws InterruptedException { @@ -224,4 +234,50 @@ public class FlushIT extends ESIntegTestCase { assertThat(shardsResult.size(), equalTo(numShards)); assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards")); } + + private void indexDoc(Engine engine, String id) throws IOException { + final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null); + final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc)); + assertThat(indexResult.getFailure(), nullValue()); + } + + public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(between(2, 3)); + final int numberOfReplicas = internalCluster().numDataNodes() - 1; + assertAcked( + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get() + ); + ensureGreen(); + final Index index = clusterService().state().metaData().index("test").getIndex(); + final ShardId shardId = new ShardId(index, 0); + final int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + index("test", "doc", Integer.toString(i)); + } + final List indexShards = internalCluster().nodesInclude("test").stream() + .map(node -> internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId)) + .collect(Collectors.toList()); + // Index extra documents to one replica - synced-flush should fail on that replica. + final IndexShard outOfSyncReplica = randomValueOtherThanMany(s -> s.routingEntry().primary(), () -> randomFrom(indexShards)); + final int extraDocs = between(1, 10); + for (int i = 0; i < extraDocs; i++) { + indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i); + } + final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1)); + assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas)); + assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo( + "out of sync replica; num docs on replica [" + (numDocs + extraDocs) + "]; num docs on primary [" + numDocs + "]")); + // Index extra documents to all shards - synced-flush should be ok. + for (IndexShard indexShard : indexShards) { + for (int i = 0; i < extraDocs; i++) { + indexDoc(IndexShardTestCase.getEngine(indexShard), "extra_" + i); + } + } + final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1)); + assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1)); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index d16b117fd8d..3d7ef82ea12 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { @@ -53,12 +54,12 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); - assertEquals("exactly one commit id", 1, commitIds.size()); + Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + assertEquals("exactly one commit id", 1, preSyncedResponses.size()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); String syncId = UUIDs.randomBase64UUID(); SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); + flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); ShardsSyncedFlushResult syncedFlushResult = listener.result; @@ -72,7 +73,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); + flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); syncedFlushResult = listener.result; @@ -172,15 +173,15 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); - assertEquals("exactly one commit id", 1, commitIds.size()); + Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + assertEquals("exactly one commit id", 1, preSyncedResponses.size()); if (randomBoolean()) { client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); } client().admin().indices().prepareFlush("test").setForce(true).get(); String syncId = UUIDs.randomBase64UUID(); final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); + flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); ShardsSyncedFlushResult syncedFlushResult = listener.result; @@ -205,12 +206,12 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); - Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); - assertEquals("exactly one commit id", 1, commitIds.size()); - commitIds.clear(); // wipe it... + Map preSyncedResponses = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); + assertEquals("exactly one commit id", 1, preSyncedResponses.size()); + preSyncedResponses.clear(); // wipe it... String syncId = UUIDs.randomBase64UUID(); SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); + flushService.sendSyncRequests(syncId, activeShards, state, preSyncedResponses, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); ShardsSyncedFlushResult syncedFlushResult = listener.result; diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index b2f2db73f2b..adaa612adb3 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -76,8 +76,8 @@ public class SyncedFlushUtil { /** * Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)} */ - public static Map sendPreSyncRequests(SyncedFlushService service, List activeShards, ClusterState state, ShardId shardId) { - LatchedListener> listener = new LatchedListener<>(); + public static Map sendPreSyncRequests(SyncedFlushService service, List activeShards, ClusterState state, ShardId shardId) { + LatchedListener> listener = new LatchedListener<>(); service.sendPreSyncRequests(activeShards, state, shardId, listener); try { listener.latch.await();