Recovery: mark shard as recovering on the cluster state thread

Closes #14115
Closes #14276
This commit is contained in:
Boaz Leskes 2015-10-25 16:46:27 +01:00
parent 085f9e032b
commit 5380322947
7 changed files with 86 additions and 132 deletions

View File

@ -20,7 +20,10 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.*;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
@ -58,6 +61,7 @@ import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexServicesProvider;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
@ -83,8 +87,8 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
@ -387,7 +391,7 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
@ -1068,17 +1072,17 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public boolean recoverFromStore(ShardRouting shard, DiscoveryNode localNode) {
public boolean recoverFromStore(DiscoveryNode localNode) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
final boolean shouldExist = shard.allocatedPostIndexCreate();
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
final boolean shouldExist = shardRouting.allocatedPostIndexCreate();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository, DiscoveryNode locaNode) {
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
public boolean restoreFromRepository(IndexShardRepository repository, DiscoveryNode locaNode) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository, locaNode);
}

View File

@ -72,13 +72,6 @@ final class StoreRecovery {
if (indexShard.routingEntry().restoreSource() != null) {
throw new IllegalStateException("can't recover - restore source is not null");
}
try {
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode);
indexShard.recovering("from store", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard, indexShouldExists);
@ -101,13 +94,6 @@ final class StoreRecovery {
if (shardRouting.restoreSource() == null) {
throw new IllegalStateException("can't restore - restore source is null");
}
try {
final RecoveryState recoveryState = new RecoveryState(shardId, shardRouting.primary(), RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode);
indexShard.recovering("from snapshot", recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("restoring from {} ...", shardRouting.restoreSource());
restore(indexShard, repository);

View File

@ -636,6 +636,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return;
}
final RestoreSource restoreSource = shardRouting.restoreSource();
if (isPeerRecovery(shardRouting)) {
try {
@ -646,41 +648,53 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.localNode());
indexShard.markAsRecovering("from " + sourceNode, recoveryState);
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
indexShard.failShard("corrupted preexisting index", e);
handleRecoveryFailure(indexService, shardRouting, true, e);
}
} else {
final DiscoveryNode localNode = clusterService.localNode();
threadPool.generic().execute(() -> {
final RestoreSource restoreSource = shardRouting.restoreSource();
final ShardId sId = indexShard.shardId();
try {
final boolean success;
if (restoreSource == null) {
} else if (restoreSource == null) {
assert indexShard.routingEntry().equals(shardRouting); // should have already be done before
// recover from filesystem store
success = indexShard.recoverFromStore(shardRouting, localNode);
} else {
// restore
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
RecoveryState.Type.STORE,
nodes.localNode(), nodes.localNode());
indexShard.markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
success = indexShard.restoreFromRepository(shardRouting, indexShardRepository, localNode);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), sId);
}
throw t;
}
if (success) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
}
}
if (success) {
if (indexShard.recoverFromStore(nodes.localNode())) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
}
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
} catch (Throwable t) {
handleRecoveryFailure(indexService, shardRouting, true, t);
}
});
} else {
// recover from a restore
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
final ShardId sId = indexShard.shardId();
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
}
} catch (Throwable first) {
try {
if (Lucene.isCorruptionException(first)) {
restoreService.failRestore(restoreSource.snapshotId(), sId);
}
} catch (Throwable second) {
first.addSuppressed(second);
} finally {
handleRecoveryFailure(indexService, shardRouting, true, first);
}
}
});
}

View File

@ -124,14 +124,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe
}
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
try {
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), indexShard.routingEntry().primary(), recoveryType, sourceNode, clusterService.localNode());
indexShard.recovering("from " + sourceNode, recoveryState);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
return;
}
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));

View File

@ -22,32 +22,15 @@ import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
@ -70,35 +53,16 @@ import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_UPGRADED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
/**
@ -113,7 +77,7 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository, DiscoveryNode)} )}
* {@link IndexShard#restoreFromRepository(IndexShardRepository, DiscoveryNode)} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>

View File

@ -20,13 +20,7 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldFilterLeafReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
@ -49,12 +43,7 @@ import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -80,11 +69,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@ -110,16 +95,11 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
/**
@ -779,7 +759,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore(localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
@ -809,8 +790,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
try {
newShard.recoverFromStore(routing, localNode);
newShard.recoverFromStore(localNode);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
@ -818,12 +800,18 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so"));
ShardRoutingHelper.initialize(routing, origRouting.currentNodeId());
assertFalse("it's already recovering", newShard.recoverFromStore(routing, localNode));
assertTrue("it's already recovering, we should ignore new ones", newShard.ignoreRecoveryAttempt());
try {
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
fail("we are already recovering, can't mark again");
} catch (IllegalIndexShardStateException e) {
// OK!
}
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing, localNode));
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
@ -859,7 +847,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test_target_shard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() {
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
}
@ -1031,7 +1020,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore(localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);

View File

@ -31,7 +31,9 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
@ -95,7 +97,9 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(0, newRouting);
shard.updateRoutingEntry(newRouting, true);
shard.recoverFromStore(newRouting, new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT));
final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode));
shard.recoverFromStore(localNode);
newRouting = new ShardRouting(newRouting);
ShardRoutingHelper.moveToStarted(newRouting);
shard.updateRoutingEntry(newRouting, true);