Simplify recovery logic in IndicesClusterStateService (#18405)
- Moves recovery logic into IndexShard - Simplifies logic to cancel peer recovery of shard where recovery source node changed - Ensures routing entry is set on initialization of IndexShard
This commit is contained in:
parent
850e9d7c57
commit
6dacac49b3
|
@ -252,6 +252,13 @@ public final class ShardRouting implements Writeable, ToXContent {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* returns true for initializing shards that recover their data from another shard copy
|
||||
*/
|
||||
public boolean isPeerRecovery() {
|
||||
return state == ShardRoutingState.INITIALIZING && (primary() == false || relocatingNodeId != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* A shard iterator with just this shard in it.
|
||||
*/
|
||||
|
|
|
@ -336,18 +336,17 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
|
||||
new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexShard = new ShadowIndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
|
||||
searchOperationListeners);
|
||||
// no indexing listeners - shadow engines don't index
|
||||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
|
||||
searchOperationListeners, indexingOperationListeners);
|
||||
}
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
indexShard.updateRoutingEntry(routing, true);
|
||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||
success = true;
|
||||
return indexShard;
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
|||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
|
@ -103,6 +104,8 @@ import org.elasticsearch.index.warmer.WarmerStats;
|
|||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTargetService;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||
import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat;
|
||||
|
@ -194,12 +197,14 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
*/
|
||||
private final AtomicBoolean active = new AtomicBoolean();
|
||||
|
||||
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
|
||||
Engine.Warmer warmer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) {
|
||||
super(shardId, indexSettings);
|
||||
Engine.Warmer warmer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) throws IOException {
|
||||
super(shardRouting.shardId(), indexSettings);
|
||||
assert shardRouting.initializing();
|
||||
this.shardRouting = shardRouting;
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
this.codecService = new CodecService(mapperService, logger);
|
||||
this.warmer = warmer;
|
||||
|
@ -243,6 +248,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
suspendableRefContainer = new SuspendableRefContainer();
|
||||
searcherWrapper = indexSearcherWrapper;
|
||||
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
|
||||
persistMetadata(shardRouting, null);
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
|
@ -313,8 +319,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the latest cluster routing entry received with this shard. Might be null if the
|
||||
* shard was just created.
|
||||
* Returns the latest cluster routing entry received with this shard.
|
||||
*/
|
||||
public ShardRouting routingEntry() {
|
||||
return this.shardRouting;
|
||||
|
@ -1325,6 +1330,58 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return this.currentEngineReference.get();
|
||||
}
|
||||
|
||||
public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService,
|
||||
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService) {
|
||||
final RestoreSource restoreSource = shardRouting.restoreSource();
|
||||
|
||||
if (shardRouting.isPeerRecovery()) {
|
||||
assert sourceNode != null : "peer recovery started but sourceNode is null";
|
||||
// we don't mark this one as relocated at the end.
|
||||
// For primaries: requests in any case are routed to both when its relocating and that way we handle
|
||||
// the edge case where its mark as relocated, and we might need to roll it back...
|
||||
// For replicas: we are recovering a backup from a primary
|
||||
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
|
||||
RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), type, sourceNode, localNode);
|
||||
try {
|
||||
markAsRecovering("from " + sourceNode, recoveryState);
|
||||
recoveryTargetService.startRecovery(this, type, sourceNode, recoveryListener);
|
||||
} catch (Throwable e) {
|
||||
failShard("corrupted preexisting index", e);
|
||||
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, e), true);
|
||||
}
|
||||
} else if (restoreSource == null) {
|
||||
// recover from filesystem store
|
||||
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
|
||||
RecoveryState.Type.STORE, localNode, localNode);
|
||||
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
|
||||
threadPool.generic().execute(() -> {
|
||||
try {
|
||||
if (recoverFromStore(localNode)) {
|
||||
recoveryListener.onRecoveryDone(recoveryState);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true);
|
||||
}
|
||||
|
||||
});
|
||||
} else {
|
||||
// recover from a restore
|
||||
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
|
||||
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode);
|
||||
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
|
||||
threadPool.generic().execute(() -> {
|
||||
try {
|
||||
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
|
||||
if (restoreFromRepository(indexShardRepository, localNode)) {
|
||||
recoveryListener.onRecoveryDone(recoveryState);
|
||||
}
|
||||
} catch (Throwable first) {
|
||||
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, first), true);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class ShardEventListener implements Engine.EventListener {
|
||||
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
|
||||
|
||||
|
@ -1367,13 +1424,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return engineFactory.newReadWriteEngine(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff this shard allows primary promotion, otherwise <code>false</code>
|
||||
*/
|
||||
public boolean allowsPrimaryPromotion() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// pkg private for testing
|
||||
void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
|
||||
assert newRouting != null : "newRouting must not be null";
|
||||
|
|
|
@ -46,12 +46,12 @@ import java.util.List;
|
|||
*/
|
||||
public final class ShadowIndexShard extends IndexShard {
|
||||
|
||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
public ShadowIndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper,
|
||||
ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer,
|
||||
List<SearchOperationListener> searchOperationListeners) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
|
||||
super(shardRouting, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
|
||||
indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList());
|
||||
}
|
||||
|
||||
|
@ -92,10 +92,6 @@ public final class ShadowIndexShard extends IndexShard {
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean allowsPrimaryPromotion() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslogStats translogStats() {
|
||||
return null; // shadow engine has no translog
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -40,6 +39,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
|
@ -57,7 +57,6 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.flush.SyncedFlushService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
|
@ -512,21 +511,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
assert currentRoutingEntry.isSameAllocation(shardRouting) :
|
||||
"local shard has a different allocation id but wasn't cleaning by applyDeletedShards. "
|
||||
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
|
||||
if (isPeerRecovery(shardRouting)) {
|
||||
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
||||
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
|
||||
if (recoveryTargetService.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", status -> !status.sourceNode().equals(sourceNode))) {
|
||||
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
||||
// closing the shard will also cancel any ongoing recovery.
|
||||
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
|
||||
shardHasBeenRemoved = true;
|
||||
if (shardRouting.isPeerRecovery()) {
|
||||
RecoveryState recoveryState = indexShard.recoveryState();
|
||||
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
|
||||
if (recoveryState.getSourceNode().equals(sourceNode) == false) {
|
||||
if (recoveryTargetService.cancelRecoveriesForShard(currentRoutingEntry.shardId(), "recovery source node changed")) {
|
||||
// getting here means that the shard was still recovering
|
||||
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
||||
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
|
||||
shardHasBeenRemoved = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (shardHasBeenRemoved == false) {
|
||||
// shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there.
|
||||
assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() :
|
||||
"shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
|
||||
try {
|
||||
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
|
||||
} catch (Throwable e) {
|
||||
|
@ -536,12 +534,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
|
||||
if (shardRouting.initializing()) {
|
||||
applyInitializingShard(event.state(), indexMetaData, indexService, shardRouting);
|
||||
applyInitializingShard(event.state(), indexService, shardRouting);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, IndexService indexService, final ShardRouting shardRouting) {
|
||||
private void applyInitializingShard(final ClusterState state, IndexService indexService, final ShardRouting shardRouting) {
|
||||
final RoutingTable routingTable = state.routingTable();
|
||||
final DiscoveryNodes nodes = state.getNodes();
|
||||
final int shardId = shardRouting.id();
|
||||
|
@ -572,8 +570,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
|
||||
// if we're in peer recovery, try to find out the source node now so in case it fails, we will not create the index shard
|
||||
DiscoveryNode sourceNode = null;
|
||||
if (isPeerRecovery(shardRouting)) {
|
||||
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
||||
if (shardRouting.isPeerRecovery()) {
|
||||
sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
|
||||
if (sourceNode == null) {
|
||||
logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
|
||||
return;
|
||||
|
@ -612,93 +610,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
return;
|
||||
}
|
||||
|
||||
final RestoreSource restoreSource = shardRouting.restoreSource();
|
||||
|
||||
if (isPeerRecovery(shardRouting)) {
|
||||
try {
|
||||
|
||||
assert sourceNode != null : "peer recovery started but sourceNode is null";
|
||||
|
||||
// we don't mark this one as relocated at the end.
|
||||
// For primaries: requests in any case are routed to both when its relocating and that way we handle
|
||||
// the edge case where its mark as relocated, and we might need to roll it back...
|
||||
// For replicas: we are recovering a backup from a primary
|
||||
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
|
||||
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.getLocalNode());
|
||||
indexShard.markAsRecovering("from " + sourceNode, recoveryState);
|
||||
recoveryTargetService.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
|
||||
} catch (Throwable e) {
|
||||
indexShard.failShard("corrupted preexisting index", e);
|
||||
handleRecoveryFailure(indexService, shardRouting, true, e);
|
||||
}
|
||||
} else if (restoreSource == null) {
|
||||
assert indexShard.routingEntry().equals(shardRouting); // should have already be done before
|
||||
// recover from filesystem store
|
||||
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
|
||||
RecoveryState.Type.STORE,
|
||||
nodes.getLocalNode(), nodes.getLocalNode());
|
||||
indexShard.markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
|
||||
threadPool.generic().execute(() -> {
|
||||
try {
|
||||
if (indexShard.recoverFromStore(nodes.getLocalNode())) {
|
||||
shardStateAction.shardStarted(shardRouting, "after recovery from store", SHARD_STATE_ACTION_LISTENER);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleRecoveryFailure(indexService, shardRouting, true, t);
|
||||
}
|
||||
|
||||
});
|
||||
} else {
|
||||
// recover from a restore
|
||||
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
|
||||
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.getLocalNode());
|
||||
indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
|
||||
threadPool.generic().execute(() -> {
|
||||
final ShardId sId = indexShard.shardId();
|
||||
try {
|
||||
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
|
||||
if (indexShard.restoreFromRepository(indexShardRepository, nodes.getLocalNode())) {
|
||||
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
|
||||
shardStateAction.shardStarted(shardRouting, "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
|
||||
}
|
||||
} catch (Throwable first) {
|
||||
try {
|
||||
if (Lucene.isCorruptionException(first)) {
|
||||
restoreService.failRestore(restoreSource.snapshotId(), sId);
|
||||
}
|
||||
} catch (Throwable second) {
|
||||
first.addSuppressed(second);
|
||||
} finally {
|
||||
handleRecoveryFailure(indexService, shardRouting, true, first);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
indexShard.startRecovery(nodes.getLocalNode(), sourceNode, recoveryTargetService,
|
||||
new RecoveryListener(shardRouting, indexService), repositoriesService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard
|
||||
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
|
||||
* routing to *require* peer recovery, use {@link ShardRouting#isPeerRecovery()} to
|
||||
* check if its needed or not.
|
||||
*/
|
||||
private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
|
||||
private static DiscoveryNode findSourceNodeForPeerRecovery(ESLogger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
|
||||
DiscoveryNode sourceNode = null;
|
||||
if (!shardRouting.primary()) {
|
||||
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
||||
for (ShardRouting entry : shardRoutingTable) {
|
||||
if (entry.primary() && entry.active()) {
|
||||
// only recover from started primary, if we can't find one, we will do it next round
|
||||
sourceNode = nodes.get(entry.currentNodeId());
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", entry);
|
||||
return null;
|
||||
}
|
||||
break;
|
||||
ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
|
||||
// only recover from started primary, if we can't find one, we will do it next round
|
||||
if (primary.active()) {
|
||||
sourceNode = nodes.get(primary.currentNodeId());
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary);
|
||||
}
|
||||
}
|
||||
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't find replica source node for {} because a primary shard can not be found.", shardRouting.shardId());
|
||||
} else {
|
||||
logger.trace("can't find replica source node because primary shard {} is not active.", primary);
|
||||
}
|
||||
} else if (shardRouting.relocatingNodeId() != null) {
|
||||
sourceNode = nodes.get(shardRouting.relocatingNodeId());
|
||||
|
@ -711,30 +643,49 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
return sourceNode;
|
||||
}
|
||||
|
||||
private boolean isPeerRecovery(ShardRouting shardRouting) {
|
||||
return !shardRouting.primary() || shardRouting.relocatingNodeId() != null;
|
||||
}
|
||||
|
||||
private class PeerRecoveryListener implements RecoveryTargetService.RecoveryListener {
|
||||
private class RecoveryListener implements RecoveryTargetService.RecoveryListener {
|
||||
|
||||
private final ShardRouting shardRouting;
|
||||
private final IndexService indexService;
|
||||
private final IndexMetaData indexMetaData;
|
||||
|
||||
private PeerRecoveryListener(ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
|
||||
private RecoveryListener(ShardRouting shardRouting, IndexService indexService) {
|
||||
this.shardRouting = shardRouting;
|
||||
this.indexService = indexService;
|
||||
this.indexMetaData = indexMetaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryDone(RecoveryState state) {
|
||||
shardStateAction.shardStarted(shardRouting, "after recovery (replica) from node [" + state.getSourceNode() + "]", SHARD_STATE_ACTION_LISTENER);
|
||||
if (state.getType() == RecoveryState.Type.SNAPSHOT) {
|
||||
restoreService.indexShardRestoreCompleted(state.getRestoreSource().snapshotId(), shardRouting.shardId());
|
||||
}
|
||||
shardStateAction.shardStarted(shardRouting, message(state), SHARD_STATE_ACTION_LISTENER);
|
||||
}
|
||||
|
||||
private String message(RecoveryState state) {
|
||||
switch (state.getType()) {
|
||||
case SNAPSHOT: return "after recovery from repository";
|
||||
case STORE: return "after recovery from store";
|
||||
case PRIMARY_RELOCATION: return "after recovery (primary relocation) from node [" + state.getSourceNode() + "]";
|
||||
case REPLICA: return "after recovery (replica) from node [" + state.getSourceNode() + "]";
|
||||
default: throw new IllegalArgumentException(state.getType().name());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
|
||||
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
|
||||
if (state.getType() == RecoveryState.Type.SNAPSHOT) {
|
||||
try {
|
||||
if (Lucene.isCorruptionException(e.getCause())) {
|
||||
restoreService.failRestore(state.getRestoreSource().snapshotId(), shardRouting.shardId());
|
||||
}
|
||||
} catch (Throwable inner) {
|
||||
e.addSuppressed(inner);
|
||||
} finally {
|
||||
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
|
||||
}
|
||||
} else {
|
||||
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -136,37 +136,18 @@ public class RecoveriesCollection {
|
|||
return onGoingRecoveries.size();
|
||||
}
|
||||
|
||||
/** cancel all ongoing recoveries for the given shard. typically because the shards is closed */
|
||||
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
|
||||
return cancelRecoveriesForShard(shardId, reason, status -> true);
|
||||
}
|
||||
|
||||
/**
|
||||
* cancel all ongoing recoveries for the given shard, if their status match a predicate
|
||||
* cancel all ongoing recoveries for the given shard
|
||||
*
|
||||
* @param reason reason for cancellation
|
||||
* @param shardId shardId for which to cancel recoveries
|
||||
* @param shouldCancel a predicate to check if a recovery should be cancelled or not.
|
||||
* Note that the recovery state can change after this check, but before it is being cancelled via other
|
||||
* already issued outstanding references.
|
||||
* @return true if a recovery was cancelled
|
||||
*/
|
||||
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate<RecoveryTarget> shouldCancel) {
|
||||
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
|
||||
boolean cancelled = false;
|
||||
for (RecoveryTarget status : onGoingRecoveries.values()) {
|
||||
if (status.shardId().equals(shardId)) {
|
||||
boolean cancel = false;
|
||||
// if we can't increment the status, the recovery is not there any more.
|
||||
if (status.tryIncRef()) {
|
||||
try {
|
||||
cancel = shouldCancel.test(status);
|
||||
} finally {
|
||||
status.decRef();
|
||||
}
|
||||
}
|
||||
if (cancel && cancelRecovery(status.recoveryId(), reason)) {
|
||||
cancelled = true;
|
||||
}
|
||||
cancelled |= cancelRecovery(status.recoveryId(), reason);
|
||||
}
|
||||
}
|
||||
return cancelled;
|
||||
|
|
|
@ -124,13 +124,10 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
|
|||
*
|
||||
* @param reason reason for cancellation
|
||||
* @param shardId shardId for which to cancel recoveries
|
||||
* @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check.
|
||||
* note that the recovery state can change after this check, but before it is being cancelled via other
|
||||
* already issued outstanding references.
|
||||
* @return true if a recovery was cancelled
|
||||
*/
|
||||
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate<RecoveryTarget> shouldCancel) {
|
||||
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel);
|
||||
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
|
||||
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason);
|
||||
}
|
||||
|
||||
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final
|
||||
|
|
|
@ -980,8 +980,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
|
||||
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
|
||||
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
|
||||
routing = ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
newShard.updateRoutingEntry(routing.moveToStarted(), true);
|
||||
SearchResponse response = client().prepareSearch().get();
|
||||
assertHitCount(response, 1);
|
||||
}
|
||||
|
@ -1009,8 +1008,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
|
||||
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
|
||||
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
|
||||
routing = ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true);
|
||||
SearchResponse response = client().prepareSearch().get();
|
||||
assertHitCount(response, 0);
|
||||
}
|
||||
|
@ -1060,8 +1058,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
|
||||
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode));
|
||||
|
||||
routing = ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true);
|
||||
SearchResponse response = client().prepareSearch().get();
|
||||
assertHitCount(response, 0);
|
||||
// we can't issue this request through a client because of the inconsistencies we created with the cluster state
|
||||
|
@ -1147,8 +1144,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
}, localNode));
|
||||
|
||||
routing = ShardRoutingHelper.moveToStarted(routing);
|
||||
test_target_shard.updateRoutingEntry(routing, true);
|
||||
test_target_shard.updateRoutingEntry(routing.moveToStarted(), true);
|
||||
assertHitCount(client().prepareSearch("test_target").get(), 1);
|
||||
assertSearchHits(client().prepareSearch("test_target").get(), "0");
|
||||
}
|
||||
|
@ -1355,7 +1351,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
};
|
||||
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener);
|
||||
shardRef.set(newShard);
|
||||
recoverShard(newShard, shard.routingEntry());
|
||||
recoverShard(newShard);
|
||||
|
||||
try {
|
||||
ExceptionsHelper.rethrowAndSuppress(failures);
|
||||
|
@ -1396,22 +1392,20 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
|
||||
public static final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners);
|
||||
return recoverShard(newShard, shard.routingEntry());
|
||||
return recoverShard(newShard);
|
||||
}
|
||||
|
||||
public static final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException {
|
||||
ShardRouting routing = ShardRoutingHelper.reinit(oldRouting);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode));
|
||||
assertTrue(newShard.recoverFromStore(localNode));
|
||||
routing = ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true);
|
||||
return newShard;
|
||||
}
|
||||
|
||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
|
||||
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
|
||||
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
|
||||
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
||||
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
|
||||
|
@ -1419,6 +1413,14 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
return newShard;
|
||||
}
|
||||
|
||||
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
|
||||
ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(),
|
||||
existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING,
|
||||
existingShardRouting.allocationId());
|
||||
shardRouting = shardRouting.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"));
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
public void testTranslogRecoverySyncsTranslog() throws IOException {
|
||||
createIndex("testindexfortranslogsync");
|
||||
client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject()
|
||||
|
@ -1432,7 +1434,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("testindexfortranslogsync"));
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
ShardRouting routing = shard.routingEntry();
|
||||
ShardRouting routing = getInitializingShardRouting(shard.routingEntry());
|
||||
test.removeShard(0, "b/c britta says so");
|
||||
IndexShard newShard = test.createShard(routing);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
|
@ -1459,7 +1461,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("index"));
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
ShardRouting routing = shard.routingEntry();
|
||||
ShardRouting routing = getInitializingShardRouting(shard.routingEntry());
|
||||
test.removeShard(0, "b/c britta says so");
|
||||
IndexShard newShard = test.createShard(routing);
|
||||
newShard.shardRouting = routing;
|
||||
|
@ -1488,10 +1490,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("index"));
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
ShardRouting routing = shard.routingEntry();
|
||||
ShardRouting routing = getInitializingShardRouting(shard.routingEntry());
|
||||
test.removeShard(0, "b/c britta says so");
|
||||
IndexShard newShard = test.createShard(routing);
|
||||
newShard.shardRouting = routing;
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode));
|
||||
// Shard is still inactive since we haven't started recovering yet
|
||||
|
|
|
@ -448,15 +448,14 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||
shardRef.set(newShard);
|
||||
try {
|
||||
assertEquals(0, imc.availableShards().size());
|
||||
ShardRouting routing = ShardRoutingHelper.reinit(shard.routingEntry());
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
ShardRouting routing = newShard.routingEntry();
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
|
||||
|
||||
assertEquals(1, imc.availableShards().size());
|
||||
assertTrue(newShard.recoverFromStore(localNode));
|
||||
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
|
||||
newShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(routing), true);
|
||||
newShard.updateRoutingEntry(routing.moveToStarted(), true);
|
||||
} finally {
|
||||
newShard.close("simon says", false);
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testRecoveryCancellationNoPredicate() throws Exception {
|
||||
public void testRecoveryCancellation() throws Exception {
|
||||
createIndex();
|
||||
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
|
||||
final long recoveryId = startRecovery(collection);
|
||||
|
@ -119,39 +119,6 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRecoveryCancellationPredicate() throws Exception {
|
||||
createIndex();
|
||||
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
|
||||
final long recoveryId = startRecovery(collection);
|
||||
final long recoveryId2 = startRecovery(collection);
|
||||
final ArrayList<AutoCloseable> toClose = new ArrayList<>();
|
||||
try {
|
||||
RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId);
|
||||
toClose.add(recoveryRef);
|
||||
ShardId shardId = recoveryRef.status().shardId();
|
||||
assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", status -> false));
|
||||
final Predicate<RecoveryTarget> shouldCancel = status -> status.recoveryId() == recoveryId;
|
||||
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel));
|
||||
assertThat("we should still have on recovery", collection.size(), equalTo(1));
|
||||
recoveryRef = collection.getRecovery(recoveryId);
|
||||
toClose.add(recoveryRef);
|
||||
assertNull("recovery should have been deleted", recoveryRef);
|
||||
recoveryRef = collection.getRecovery(recoveryId2);
|
||||
toClose.add(recoveryRef);
|
||||
assertNotNull("recovery should NOT have been deleted", recoveryRef);
|
||||
|
||||
} finally {
|
||||
// TODO: do we want a lucene IOUtils version of this?
|
||||
for (AutoCloseable closeable : toClose) {
|
||||
if (closeable != null) {
|
||||
closeable.close();
|
||||
}
|
||||
}
|
||||
collection.cancelRecovery(recoveryId, "meh");
|
||||
collection.cancelRecovery(recoveryId2, "meh");
|
||||
}
|
||||
}
|
||||
|
||||
protected void createIndex() {
|
||||
createIndex("test",
|
||||
Settings.builder()
|
||||
|
|
Loading…
Reference in New Issue