Cleanup Redundant Futures in Recovery Code (#48805) (#48832)

Follow up to #48110 cleaning up the redundant future
uses that were left over from that change.
This commit is contained in:
Armin Braun 2019-11-02 17:28:12 +01:00 committed by GitHub
parent 4c70770877
commit a22f6fbe3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 74 additions and 81 deletions

View File

@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
@ -1821,34 +1822,38 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return path;
}
public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
List<IndexShard> localShards) throws IOException {
public void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards,
ActionListener<Boolean> listener) throws IOException {
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " +
recoveryState.getRecoverySource();
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
final ActionListener<Boolean> recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots));
boolean success = false;
try {
for (IndexShard shard : localShards) {
snapshots.add(new LocalShardSnapshot(shard));
}
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
success = true;
} finally {
IOUtils.close(snapshots);
if (success == false) {
IOUtils.close(snapshots);
}
}
}
public boolean recoverFromStore() {
public void recoverFromStore(ActionListener<Boolean> listener) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this);
storeRecovery.recoverFromStore(this, listener);
}
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
@ -2520,17 +2525,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case PEER:
try {
@ -2543,17 +2538,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
break;
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
executeRecovery("from snapshot",
recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
@ -2578,18 +2565,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (numShards == startedShards.size()) {
assert requiredShards.isEmpty() == false;
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
.filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from local shards", recoveryState, recoveryListener,
l -> recoverFromLocalShards(mappingUpdateConsumer,
startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));
} else {
final RuntimeException e;
if (numShards == -1) {
@ -2607,6 +2585,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}
/**
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
*/

View File

@ -33,7 +33,6 @@ import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -83,31 +82,27 @@ final class StoreRecovery {
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @param listener resolves to <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard) {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE :
"expected store recovery type but was: " + recoveryType;
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final ActionListener<Boolean> recoveryListener = recoveryListener(indexShard, future);
try {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard);
recoveryListener.onResponse(true);
} catch (Exception e) {
recoveryListener.onFailure(e);
}
return future.actionGet();
return true;
});
} else {
listener.onResponse(false);
}
return false;
}
boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
final IndexShard indexShard, final List<LocalShardSnapshot> shards) {
void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndexShard indexShard,
List<LocalShardSnapshot> shards, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType;
@ -129,8 +124,7 @@ final class StoreRecovery {
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " +
"single type but the index is created before 6.0.0";
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
ActionListener.completeWith(recoveryListener(indexShard, future), () -> {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
@ -150,10 +144,9 @@ final class StoreRecovery {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
}
});
assert future.isDone();
return future.actionGet();
} else {
listener.onResponse(false);
}
return false;
}
void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,

View File

@ -119,6 +119,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@ -642,7 +643,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
recoverFromStore(newShard);
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
return newShard;
}

View File

@ -1373,7 +1373,7 @@ public class IndexShardTests extends IndexShardTestCase {
snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
@ -2051,7 +2051,7 @@ public class IndexShardTests extends IndexShardTestCase {
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -2077,7 +2077,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShard newShard = reinitShard(shard);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -2108,7 +2108,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, totalOps);
assertThat(newShard.getHistoryUUID(), not(equalTo(historyUUID)));
@ -2156,7 +2156,7 @@ public class IndexShardTests extends IndexShardTestCase {
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(1, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -2181,7 +2181,7 @@ public class IndexShardTests extends IndexShardTestCase {
newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(primaryShardRouting,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(newShard.indexSettings.isSoftDeleteEnabled() ? 0 : 2));
}
@ -2202,7 +2202,7 @@ public class IndexShardTests extends IndexShardTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
@ -2229,7 +2229,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting routing = newShard.routingEntry();
newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null));
try {
newShard.recoverFromStore();
recoverFromStore(newShard);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
@ -2249,7 +2249,7 @@ public class IndexShardTests extends IndexShardTestCase {
newShard = reinitShard(newShard,
ShardRoutingHelper.initWithSameId(routing, RecoverySource.EmptyStoreRecoverySource.INSTANCE));
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore());
assertTrue("recover even if there is nothing to recover", recoverFromStore(newShard));
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 0);
@ -2300,7 +2300,7 @@ public class IndexShardTests extends IndexShardTestCase {
EMPTY_EVENT_LISTENER);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertTrue(recoverFromStore(newShard));
assertThat(getShardDocUIDs(newShard), containsInAnyOrder("doc-0", "doc-2"));
closeShards(newShard);
}
@ -2632,7 +2632,7 @@ public class IndexShardTests extends IndexShardTestCase {
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));
primary.recoverFromStore();
recoverFromStore(primary);
primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations());
primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations());
@ -2820,11 +2820,15 @@ public class IndexShardTests extends IndexShardTestCase {
final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true);
recoverShardFromStore(differentIndex);
expectThrows(IllegalArgumentException.class, () -> {
targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex), future);
future.actionGet();
});
closeShards(differentIndex);
assertTrue(targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard)));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard), future);
assertTrue(future.actionGet());
RecoveryState recoveryState = targetShard.recoveryState();
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertTrue(recoveryState.getIndex().fileDetails().size() > 0);
@ -4137,7 +4141,7 @@ public class IndexShardTests extends IndexShardTestCase {
);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null));
assertTrue(readonlyShard.recoverFromStore());
recoverFromStore(readonlyShard);
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
closeShards(readonlyShard);
}

View File

@ -451,7 +451,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertEquals(1, imc.availableShards().size());
assertTrue(newShard.recoverFromStore());
assertTrue(IndexShardTestCase.recoverFromStore(newShard));
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted());
} finally {

View File

@ -137,7 +137,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null));
shard.recoverFromStore();
IndexShardTestCase.recoverFromStore(shard);
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(6, counter.get());

View File

@ -323,7 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
protected synchronized void recoverPrimary(IndexShard primary) {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
recoverFromStore(primary);
}
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {

View File

@ -546,7 +546,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));
primary.recoverFromStore();
recoverFromStore(primary);
updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry()));
}
@ -792,6 +792,12 @@ public abstract class IndexShardTestCase extends ESTestCase {
shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force));
}
public static boolean recoverFromStore(IndexShard newShard) {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
newShard.recoverFromStore(future);
return future.actionGet();
}
/** Recover a shard from a snapshot using a given repository **/
protected void recoverShardFromSnapshot(final IndexShard shard,
final Snapshot snapshot,

View File

@ -132,7 +132,7 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
final IndexShard newPrimary = reinitShard(oldPrimary);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null));
assertTrue(newPrimary.recoverFromStore());
assertTrue(recoverFromStore(newPrimary));
IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted());
newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
// The second bulk includes some operations from the first bulk which were processed already;