From a22f6fbe3c5c49038c51e258a78fe67d8235fe0c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 2 Nov 2019 17:28:12 +0100 Subject: [PATCH] Cleanup Redundant Futures in Recovery Code (#48805) (#48832) Follow up to #48110 cleaning up the redundant future uses that were left over from that change. --- .../elasticsearch/index/shard/IndexShard.java | 71 ++++++++----------- .../index/shard/StoreRecovery.java | 33 ++++----- .../index/shard/IndexShardIT.java | 3 +- .../index/shard/IndexShardTests.java | 32 +++++---- .../IndexingMemoryControllerTests.java | 2 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 8 ++- .../action/bulk/BulkShardOperationsTests.java | 2 +- 9 files changed, 74 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e693f0aa15a..0f5bb284fcf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; @@ -1821,34 +1822,38 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return path; } - public boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, - List localShards) throws IOException { + public void recoverFromLocalShards(BiConsumer mappingUpdateConsumer, List localShards, + ActionListener listener) throws IOException { assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " + recoveryState.getRecoverySource(); final List snapshots = new ArrayList<>(); + final ActionListener recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots)); + boolean success = false; try { for (IndexShard shard : localShards) { snapshots.add(new LocalShardSnapshot(shard)); } - // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots); + storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener); + success = true; } finally { - IOUtils.close(snapshots); + if (success == false) { + IOUtils.close(snapshots); + } } } - public boolean recoverFromStore() { + public void recoverFromStore(ActionListener listener) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; assert shardRouting.initializing() : "can only start recovery on initializing shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromStore(this); + storeRecovery.recoverFromStore(this, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -2520,17 +2525,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl switch (recoveryState.getRecoverySource().getType()) { case EMPTY_STORE: case EXISTING_STORE: - markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromStore()) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case PEER: try { @@ -2543,17 +2538,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } break; case SNAPSHOT: - markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread - SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); - threadPool.generic().execute( - ActionRunnable.wrap(ActionListener.wrap(r -> { - if (r) { - recoveryListener.onRecoveryDone(recoveryState); - } - }, - e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), - restoreListener -> restoreFromRepository( - repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener))); + final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); + executeRecovery("from snapshot", + recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l)); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); @@ -2578,18 +2565,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (numShards == startedShards.size()) { assert requiredShards.isEmpty() == false; - markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() - .filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + executeRecovery("from local shards", recoveryState, recoveryListener, + l -> recoverFromLocalShards(mappingUpdateConsumer, + startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l)); } else { final RuntimeException e; if (numShards == -1) { @@ -2607,6 +2585,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener, + CheckedConsumer, Exception> action) { + markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { + if (r) { + recoveryListener.onRecoveryDone(recoveryState); + } + }, + e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); + } + /** * Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}). */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index cc4cd1a0e6f..c0c44730126 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -33,7 +33,6 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -83,31 +82,27 @@ final class StoreRecovery { * exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index * files / transaction logs. This * @param indexShard the index shard instance to recovery the shard into - * @return true if the shard has been recovered successfully, false if the recovery - * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. + * @param listener resolves to true if the shard has been recovered successfully, false if the recovery + * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. * @see Store */ - boolean recoverFromStore(final IndexShard indexShard) { + void recoverFromStore(final IndexShard indexShard, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; - final PlainActionFuture future = PlainActionFuture.newFuture(); - final ActionListener recoveryListener = recoveryListener(indexShard, future); - try { + ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); - recoveryListener.onResponse(true); - } catch (Exception e) { - recoveryListener.onFailure(e); - } - return future.actionGet(); + return true; + }); + } else { + listener.onResponse(false); } - return false; } - boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, - final IndexShard indexShard, final List shards) { + void recoverFromLocalShards(BiConsumer mappingUpdateConsumer, IndexShard indexShard, + List shards, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType; @@ -129,8 +124,7 @@ final class StoreRecovery { final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " + "single type but the index is created before 6.0.0"; - final PlainActionFuture future = PlainActionFuture.newFuture(); - ActionListener.completeWith(recoveryListener(indexShard, future), () -> { + ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! @@ -150,10 +144,9 @@ final class StoreRecovery { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } }); - assert future.isDone(); - return future.actionGet(); + } else { + listener.onResponse(false); } - return false; } void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index b0c13415330..a95e8c69a03 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -119,6 +119,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; +import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -642,7 +643,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + recoverFromStore(newShard); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); return newShard; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 493fd15d6de..7f4dfff5035 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1373,7 +1373,7 @@ public class IndexShardTests extends IndexShardTestCase { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); @@ -2051,7 +2051,7 @@ public class IndexShardTests extends IndexShardTestCase { RecoverySource.ExistingStoreRecoverySource.INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2077,7 +2077,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2108,7 +2108,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, totalOps); assertThat(newShard.getHistoryUUID(), not(equalTo(historyUUID))); @@ -2156,7 +2156,7 @@ public class IndexShardTests extends IndexShardTestCase { RecoverySource.ExistingStoreRecoverySource.INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2181,7 +2181,7 @@ public class IndexShardTests extends IndexShardTestCase { newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, RecoverySource.ExistingStoreRecoverySource.INSTANCE)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(newShard.indexSettings.isSoftDeleteEnabled() ? 0 : 2)); } @@ -2202,7 +2202,7 @@ public class IndexShardTests extends IndexShardTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2229,7 +2229,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting routing = newShard.routingEntry(); newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); try { - newShard.recoverFromStore(); + recoverFromStore(newShard); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -2249,7 +2249,7 @@ public class IndexShardTests extends IndexShardTestCase { newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(routing, RecoverySource.EmptyStoreRecoverySource.INSTANCE)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); + assertTrue("recover even if there is nothing to recover", recoverFromStore(newShard)); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); @@ -2300,7 +2300,7 @@ public class IndexShardTests extends IndexShardTestCase { EMPTY_EVENT_LISTENER); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertThat(getShardDocUIDs(newShard), containsInAnyOrder("doc-0", "doc-2")); closeShards(newShard); } @@ -2632,7 +2632,7 @@ public class IndexShardTests extends IndexShardTestCase { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null)); - primary.recoverFromStore(); + recoverFromStore(primary); primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations()); primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations()); @@ -2820,11 +2820,15 @@ public class IndexShardTests extends IndexShardTestCase { final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true); recoverShardFromStore(differentIndex); expectThrows(IllegalArgumentException.class, () -> { - targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex)); + final PlainActionFuture future = PlainActionFuture.newFuture(); + targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex), future); + future.actionGet(); }); closeShards(differentIndex); - assertTrue(targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard))); + final PlainActionFuture future = PlainActionFuture.newFuture(); + targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard), future); + assertTrue(future.actionGet()); RecoveryState recoveryState = targetShard.recoveryState(); assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); assertTrue(recoveryState.getIndex().fileDetails().size() > 0); @@ -4137,7 +4141,7 @@ public class IndexShardTests extends IndexShardTestCase { ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null)); - assertTrue(readonlyShard.recoverFromStore()); + recoverFromStore(readonlyShard); assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index a48196e0ff7..8fa58bd05e5 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -451,7 +451,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); assertEquals(1, imc.availableShards().size()); - assertTrue(newShard.recoverFromStore()); + assertTrue(IndexShardTestCase.recoverFromStore(newShard)); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); } finally { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 122d74121a7..436126930e3 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -137,7 +137,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null)); - shard.recoverFromStore(); + IndexShardTestCase.recoverFromStore(shard); newRouting = ShardRoutingHelper.moveToStarted(newRouting); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(6, counter.get()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index f8e1ca41459..de53078f983 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -323,7 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected synchronized void recoverPrimary(IndexShard primary) { final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); - primary.recoverFromStore(); + recoverFromStore(primary); } public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index b10d79a8156..83aa854e8ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -546,7 +546,7 @@ public abstract class IndexShardTestCase extends ESTestCase { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null)); - primary.recoverFromStore(); + recoverFromStore(primary); updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry())); } @@ -792,6 +792,12 @@ public abstract class IndexShardTestCase extends ESTestCase { shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force)); } + public static boolean recoverFromStore(IndexShard newShard) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + newShard.recoverFromStore(future); + return future.actionGet(); + } + /** Recover a shard from a snapshot using a given repository **/ protected void recoverShardFromSnapshot(final IndexShard shard, final Snapshot snapshot, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 43302a5177e..6ef24f278c2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -132,7 +132,7 @@ public class BulkShardOperationsTests extends IndexShardTestCase { final IndexShard newPrimary = reinitShard(oldPrimary); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null)); - assertTrue(newPrimary.recoverFromStore()); + assertTrue(recoverFromStore(newPrimary)); IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted()); newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); // The second bulk includes some operations from the first bulk which were processed already;