improve peer recovery shard state handling. when throttling a recovery, don't restore the non recovering state and then move to recovery again with each retry, keep the recovering state while retrying

This commit is contained in:
kimchy 2010-08-12 12:31:27 +03:00
parent c0a7dc327c
commit b1d1f1ff94
3 changed files with 22 additions and 37 deletions

View File

@ -99,37 +99,39 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public void startRecovery(final StartRecoveryRequest request, final RecoveryListener listener) {
public void startRecovery(final StartRecoveryRequest request, boolean fromRetry, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
// mark the shard as recovering
IndexShardState preRecoveryState;
try {
preRecoveryState = shard.recovering();
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
if (!fromRetry) {
try {
shard.recovering();
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
return;
}
}
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
final IndexShardState fPreRecoveryState = preRecoveryState;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
doRecovery(shard, fPreRecoveryState, request, listener);
doRecovery(shard, request, listener);
}
});
}
private void doRecovery(final InternalIndexShard shard, final IndexShardState preRecoveryState, final StartRecoveryRequest request, final RecoveryListener listener) {
// we know we are on a thread, we can spin till we can engage in recovery
private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoveryListener listener) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
shard.restoreRecoveryState(preRecoveryState);
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -151,7 +153,6 @@ public class RecoveryTarget extends AbstractComponent {
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
removeAndCleanOnGoingRecovery(request.shardId());
shard.restoreRecoveryState(preRecoveryState);
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -193,11 +194,6 @@ public class RecoveryTarget extends AbstractComponent {
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
try {
shard.restoreRecoveryState(preRecoveryState);
} catch (IndexShardNotRecoveringException e1) {
// ignore this, we might be closing...
}
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}

View File

@ -148,17 +148,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
public InternalIndexShard restoreRecoveryState(IndexShardState stateToRestore) {
synchronized (mutex) {
if (this.state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
logger.debug("state: [{}]->[{}], restored after recovery", state, stateToRestore);
this.state = stateToRestore;
}
return this;
}
public InternalIndexShard relocated() throws IndexShardNotStartedException {
synchronized (mutex) {
if (state != IndexShardState.STARTED) {

View File

@ -324,7 +324,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5());
recoveryTarget.startRecovery(request, new PeerRecoveryListener(request, shardRouting, indexService));
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
break;
@ -354,7 +354,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5());
recoveryTarget.startRecovery(request, new PeerRecoveryListener(request, shardRouting, indexService));
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
@ -383,7 +383,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override public void onRetryRecovery(TimeValue retryAfter) {
threadPool.schedule(new Runnable() {
@Override public void run() {
recoveryTarget.startRecovery(request, PeerRecoveryListener.this);
recoveryTarget.startRecovery(request, true, PeerRecoveryListener.this);
}
}, retryAfter);
}